aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-09 18:27:44 +0000
committerSean Owen <sowen@cloudera.com>2016-03-09 18:27:44 +0000
commit256704c771d301700af9ebf0d180c1ba7c4116c0 (patch)
treef9be79919b5c6ec4847c24a086fa844555e2cd12 /extras
parent7791d0c3a9bdfe73e071266846f9ab1491fce50c (diff)
downloadspark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.gz
spark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.bz2
spark-256704c771d301700af9ebf0d180c1ba7c4116c0.zip
[SPARK-13595][BUILD] Move docker, extras modules into external
## What changes were proposed in this pull request? Move `docker` dirs out of top level into `external/`; move `extras/*` into `external/` ## How was this patch tested? This is tested with Jenkins tests. Author: Sean Owen <sowen@cloudera.com> Closes #11523 from srowen/SPARK-13595.
Diffstat (limited to 'extras')
-rw-r--r--extras/README.md1
-rw-r--r--extras/java8-tests/README.md24
-rw-r--r--extras/java8-tests/pom.xml161
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java393
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java905
-rw-r--r--extras/java8-tests/src/test/resources/log4j.properties28
-rw-r--r--extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala27
-rw-r--r--extras/kinesis-asl-assembly/pom.xml181
-rw-r--r--extras/kinesis-asl/pom.xml87
-rw-r--r--extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java189
-rw-r--r--extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py83
-rw-r--r--extras/kinesis-asl/src/main/resources/log4j.properties37
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala276
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala288
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala133
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala76
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala361
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala177
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala260
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala560
-rw-r--r--extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java62
-rw-r--r--extras/kinesis-asl/src/test/resources/log4j.properties27
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala72
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala259
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala152
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala46
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala210
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala297
-rw-r--r--extras/spark-ganglia-lgpl/pom.xml49
-rw-r--r--extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala90
30 files changed, 0 insertions, 5511 deletions
diff --git a/extras/README.md b/extras/README.md
deleted file mode 100644
index 1b4174b7d5..0000000000
--- a/extras/README.md
+++ /dev/null
@@ -1 +0,0 @@
-This directory contains build components not included by default in Spark's build.
diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md
deleted file mode 100644
index dc9e87f2ee..0000000000
--- a/extras/java8-tests/README.md
+++ /dev/null
@@ -1,24 +0,0 @@
-# Java 8 Test Suites
-
-These tests require having Java 8 installed and are isolated from the main Spark build.
-If Java 8 is not your system's default Java version, you will need to point Spark's build
-to your Java location. The set-up depends a bit on the build system:
-
-* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
- `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
- include the Java 8 test project.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"`
-
-* For Maven users,
-
- Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
- automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
- must be used.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
-
- Note that the above command can only be run from project root directory since this module
- depends on core and the test-jars of core and streaming. This means an install step is
- required to make the test dependencies visible to the Java 8 sub-project.
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
deleted file mode 100644
index 0ad9c5303a..0000000000
--- a/extras/java8-tests/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>java8-tests_2.11</artifactId>
- <packaging>pom</packaging>
- <name>Spark Project Java8 Tests POM</name>
-
- <properties>
- <sbt.project.name>java8-tests</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
-
- <profiles>
- <profile>
- <id>java8-tests</id>
- </profile>
- </profiles>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-install-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <systemPropertyVariables>
- <!-- For some reason surefire isn't setting this log4j file on the
- test classpath automatically. So we add it manually. -->
- <log4j.configuration>
- file:src/test/resources/log4j.properties
- </log4j.configuration>
- </systemPropertyVariables>
- <skipTests>false</skipTests>
- <includes>
- <include>**/Suite*.java</include>
- <include>**/*Suite.java</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <id>test-compile-first</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <fork>true</fork>
- <verbose>true</verbose>
- <forceJavacCompilerUse>true</forceJavacCompilerUse>
- <source>1.8</source>
- <compilerVersion>1.8</compilerVersion>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- <maxmem>1024m</maxmem>
- </configuration>
- </plugin>
- <plugin>
- <!-- disabled -->
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <phase>none</phase>
- </execution>
- <execution>
- <id>scala-compile-first</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>scala-test-compile-first</id>
- <phase>none</phase>
- </execution>
- <execution>
- <id>attach-scaladocs</id>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
deleted file mode 100644
index c0b58e713f..0000000000
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.File;
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.util.Utils;
-
-/**
- * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
- * lambda syntax.
- */
-public class Java8APISuite implements Serializable {
- static int foreachCalls = 0;
- private transient JavaSparkContext sc;
-
- @Before
- public void setUp() {
- sc = new JavaSparkContext("local", "JavaAPISuite");
- }
-
- @After
- public void tearDown() {
- sc.stop();
- sc = null;
- }
-
- @Test
- public void foreachWithAnonymousClass() {
- foreachCalls = 0;
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) {
- foreachCalls++;
- }
- });
- Assert.assertEquals(2, foreachCalls);
- }
-
- @Test
- public void foreach() {
- foreachCalls = 0;
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
- rdd.foreach(x -> foreachCalls++);
- Assert.assertEquals(2, foreachCalls);
- }
-
- @Test
- public void groupBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
- JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
-
- oddsAndEvens = rdd.groupBy(isOdd, 1);
- Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
- Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
- }
-
- @Test
- public void leftOuterJoin() {
- JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(1, 2),
- new Tuple2<>(2, 1),
- new Tuple2<>(3, 1)
- ));
- JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>(1, 'x'),
- new Tuple2<>(2, 'y'),
- new Tuple2<>(2, 'z'),
- new Tuple2<>(4, 'w')
- ));
- List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
- rdd1.leftOuterJoin(rdd2).collect();
- Assert.assertEquals(5, joined.size());
- Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
- rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
- Assert.assertEquals(3, firstUnmatched._1().intValue());
- }
-
- @Test
- public void foldReduce() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
- Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
-
- int sum = rdd.fold(0, add);
- Assert.assertEquals(33, sum);
-
- sum = rdd.reduce(add);
- Assert.assertEquals(33, sum);
- }
-
- @Test
- public void foldByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
- Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
- Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
- Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
- }
-
- @Test
- public void reduceByKey() {
- List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
- new Tuple2<>(2, 1),
- new Tuple2<>(2, 1),
- new Tuple2<>(1, 1),
- new Tuple2<>(3, 2),
- new Tuple2<>(3, 1)
- );
- JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
- JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
- Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
- Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
- Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
-
- Map<Integer, Integer> localCounts = counts.collectAsMap();
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
-
- localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
- Assert.assertEquals(1, localCounts.get(1).intValue());
- Assert.assertEquals(2, localCounts.get(2).intValue());
- Assert.assertEquals(3, localCounts.get(3).intValue());
- }
-
- @Test
- public void map() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
- doubles.collect();
- JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
- .cache();
- pairs.collect();
- JavaRDD<String> strings = rdd.map(Object::toString).cache();
- strings.collect();
- }
-
- @Test
- public void flatMap() {
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
- "The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
-
- Assert.assertEquals("Hello", words.first());
- Assert.assertEquals(11, words.count());
-
- JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
- List<Tuple2<String, String>> pairs2 = new LinkedList<>();
- for (String word : s.split(" ")) {
- pairs2.add(new Tuple2<>(word, word));
- }
- return pairs2;
- });
-
- Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
- Assert.assertEquals(11, pairs.count());
-
- JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
- List<Double> lengths = new LinkedList<>();
- for (String word : s.split(" ")) {
- lengths.add((double) word.length());
- }
- return lengths;
- });
-
- Assert.assertEquals(5.0, doubles.first(), 0.01);
- Assert.assertEquals(11, pairs.count());
- }
-
- @Test
- public void mapsFromPairsToPairs() {
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
-
- // Regression test for SPARK-668:
- JavaPairRDD<String, Integer> swapped =
- pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
- swapped.collect();
-
- // There was never a bug here, but it's worth testing:
- pairRDD.map(Tuple2::swap).collect();
- }
-
- @Test
- public void mapPartitions() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
- JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
- int sum = 0;
- while (iter.hasNext()) {
- sum += iter.next();
- }
- return Collections.singletonList(sum);
- });
-
- Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
- }
-
- @Test
- public void sequenceFile() {
- File tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
- String outputDir = new File(tempDir, "output").getAbsolutePath();
- List<Tuple2<Integer, String>> pairs = Arrays.asList(
- new Tuple2<>(1, "a"),
- new Tuple2<>(2, "aa"),
- new Tuple2<>(3, "aaa")
- );
- JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
-
- rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
-
- // Try reading the output back as an object file
- JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
- .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
- Assert.assertEquals(pairs, readRDD.collect());
- Utils.deleteRecursively(tempDir);
- }
-
- @Test
- public void zip() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
- JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
- zipped.count();
- }
-
- @Test
- public void zipPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
- JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
- FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
- (Iterator<Integer> i, Iterator<String> s) -> {
- int sizeI = 0;
- while (i.hasNext()) {
- sizeI += 1;
- i.next();
- }
- int sizeS = 0;
- while (s.hasNext()) {
- sizeS += 1;
- s.next();
- }
- return Arrays.asList(sizeI, sizeS).iterator();
- };
- JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
- Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
- }
-
- @Test
- public void accumulators() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
-
- Accumulator<Integer> intAccum = sc.intAccumulator(10);
- rdd.foreach(intAccum::add);
- Assert.assertEquals((Integer) 25, intAccum.value());
-
- Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
- rdd.foreach(x -> doubleAccum.add((double) x));
- Assert.assertEquals((Double) 25.0, doubleAccum.value());
-
- // Try a custom accumulator type
- AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
- @Override
- public Float addInPlace(Float r, Float t) {
- return r + t;
- }
- @Override
- public Float addAccumulator(Float r, Float t) {
- return r + t;
- }
- @Override
- public Float zero(Float initialValue) {
- return 0.0f;
- }
- };
-
- Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
- rdd.foreach(x -> floatAccum.add((float) x));
- Assert.assertEquals((Float) 25.0f, floatAccum.value());
-
- // Test the setValue method
- floatAccum.setValue(5.0f);
- Assert.assertEquals((Float) 5.0f, floatAccum.value());
- }
-
- @Test
- public void keyBy() {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
- List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
- Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
- Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
- }
-
- @Test
- public void mapOnPairRDD() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
- JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
- JavaPairRDD<Integer, Integer> rdd3 =
- rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
- Assert.assertEquals(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(0, 2),
- new Tuple2<>(1, 3),
- new Tuple2<>(0, 4)), rdd3.collect());
- }
-
- @Test
- public void collectPartitions() {
- JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
-
- JavaPairRDD<Integer, Integer> rdd2 =
- rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
- List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
- Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
-
- parts = rdd1.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
- Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
-
- Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
- rdd2.collectPartitions(new int[]{0})[0]);
-
- List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
- Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
- Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
- parts2[1]);
- }
-
- @Test
- public void collectAsMapWithIntArrayValues() {
- // Regression test for SPARK-1040
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
- JavaPairRDD<Integer, int[]> pairRDD =
- rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
- pairRDD.collect(); // Works fine
- pairRDD.collectAsMap(); // Used to crash with ClassCastException
- }
-}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
deleted file mode 100644
index 604d818ef1..0000000000
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ /dev/null
@@ -1,905 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
-
-/**
- * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
- * lambda syntax.
- */
-@SuppressWarnings("unchecked")
-public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
-
- @Test
- public void testMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"),
- Arrays.asList("goodnight", "moon"));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(5, 5),
- Arrays.asList(9, 4));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(String::length);
- JavaTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("yankees"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testMapPartitions() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOX"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(in -> {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- });
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduce() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(15),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByWindow() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(21),
- Arrays.asList(39),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reducedWindowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testTransform() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3),
- Arrays.asList(4, 5, 6),
- Arrays.asList(7, 8, 9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3, 4, 5),
- Arrays.asList(6, 7, 8),
- Arrays.asList(9, 10, 11));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
-
- JavaTestUtils.attachTestOutputStream(transformed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testVariousTransform() {
- // tests whether all variations of transform can be called from Java
-
- List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
-
- JavaDStream<Integer> transformed1 = stream.transform(in -> null);
- JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
- JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
- JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
- JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
- JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
- JavaPairDStream<String, String> pairTransformed4 =
- pairStream.transformToPair((x, time) -> null);
-
- }
-
- @Test
- public void testTransformWith() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(
- new Tuple2<>("california", "sharks"),
- new Tuple2<>("new york", "rangers")));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "islanders")));
-
-
- List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("dodgers", "giants")),
- new Tuple2<>("new york",
- new Tuple2<>("yankees", "mets"))),
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("sharks", "ducks")),
- new Tuple2<>("new york",
- new Tuple2<>("rangers", "islanders"))));
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<String, String>> joined =
- pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
-
- JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
- for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
- unorderedResult.add(Sets.newHashSet(res));
- }
-
- Assert.assertEquals(expected, unorderedResult);
- }
-
-
- @Test
- public void testVariousTransformWith() {
- // tests whether all variations of transformWith can be called from Java
-
- List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
- List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
- JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
- JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
- JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
-
- JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
- JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> transformed3 =
- stream1.transformWithToPair(stream2,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> transformed4 =
- stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
-
- JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
-
- JavaDStream<Double> pairTransformed2_ =
- pairStream1.transformWith(pairStream1,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> pairTransformed3 =
- pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
-
- JavaPairDStream<Double, Double> pairTransformed4 =
- pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
- }
-
- @Test
- public void testStreamingContextTransform() {
- List<List<Integer>> stream1input = Arrays.asList(
- Arrays.asList(1),
- Arrays.asList(2)
- );
-
- List<List<Integer>> stream2input = Arrays.asList(
- Arrays.asList(3),
- Arrays.asList(4)
- );
-
- List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, "x")),
- Arrays.asList(new Tuple2<>(2, "y"))
- );
-
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
- Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
- );
-
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
- JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
- JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
-
- List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
-
- // This is just to test whether this transform to JavaStream compiles
- JavaDStream<Long> transformed1 = ssc.transform(
- listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- Assert.assertEquals(2, listOfRDDs.size());
- return null;
- });
-
- List<JavaDStream<?>> listOfDStreams2 =
- Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
-
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
- listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
- Assert.assertEquals(3, listOfRDDs.size());
- JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
- JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
- JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
- JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
- PairFunction<Integer, Integer, Integer> mapToTuple =
- (Integer i) -> new Tuple2<>(i, i);
- return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
- });
- JavaTestUtils.attachTestOutputStream(transformed2);
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
- JavaTestUtils.runStreams(ssc, 2, 2);
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("go", "giants"),
- Arrays.asList("boo", "dodgers"),
- Arrays.asList("athletics"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
- Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
- Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,1,1),
- Arrays.asList(1,1,1));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
-
- stream.foreachRDD(rdd -> {
- accumRdd.add(1);
- rdd.foreach(x -> accumEle.add(1));
- });
-
- // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD((rdd, time) -> null);
-
- JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(2, accumRdd.value().intValue());
- Assert.assertEquals(6, accumEle.value().intValue());
- }
-
- @Test
- public void testPairFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("dodgers"),
- Arrays.asList("athletics"));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(6, "g"),
- new Tuple2<>(6, "i"),
- new Tuple2<>(6, "a"),
- new Tuple2<>(6, "n"),
- new Tuple2<>(6, "t"),
- new Tuple2<>(6, "s")),
- Arrays.asList(
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "o"),
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "g"),
- new Tuple2<>(7, "e"),
- new Tuple2<>(7, "r"),
- new Tuple2<>(7, "s")),
- Arrays.asList(
- new Tuple2<>(9, "a"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "h"),
- new Tuple2<>(9, "l"),
- new Tuple2<>(9, "e"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "i"),
- new Tuple2<>(9, "c"),
- new Tuple2<>(9, "s")));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter : s.split("(?!^)")) {
- out.add(new Tuple2<>(s.length(), letter));
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- /*
- * Performs an order-invariant comparison of lists representing two RDD streams. This allows
- * us to account for ordering variation within individual RDD's which occurs during windowing.
- */
- public static <T extends Comparable<T>> void assertOrderInvariantEquals(
- List<List<T>> expected, List<List<T>> actual) {
- expected.forEach(list -> Collections.sort(list));
- List<List<T>> sortedActual = new ArrayList<>();
- actual.forEach(list -> {
- List<T> sortedList = new ArrayList<>(list);
- Collections.sort(sortedList);
- sortedActual.add(sortedList);
- });
- Assert.assertEquals(expected, sortedActual);
- }
-
- @Test
- public void testPairFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("giants", 6)),
- Arrays.asList(new Tuple2<>("yankees", 7)));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream =
- stream.mapToPair(x -> new Tuple2<>(x, x.length()));
- JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "yankees"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(new Tuple2<>("california", "sharks"),
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "rangers"),
- new Tuple2<>("new york", "islanders")));
-
- List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 1),
- new Tuple2<>("california", 3),
- new Tuple2<>("new york", 4),
- new Tuple2<>("new york", 1)),
- Arrays.asList(
- new Tuple2<>("california", 5),
- new Tuple2<>("california", 5),
- new Tuple2<>("new york", 3),
- new Tuple2<>("new york", 1)));
-
- @Test
- public void testPairMap() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairMapPartitions() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
- LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
- while (in.hasNext()) {
- Tuple2<String, Integer> next = in.next();
- out.add(next.swap());
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairMap2() { // Maps pair -> single
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1, 3, 4, 1),
- Arrays.asList(5, 5, 3, 1));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
- List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)),
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")),
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<>(in._2(), s.toString()));
- }
- return out;
- });
-
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairReduceByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
-
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testCombineByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
- (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
-
- JavaTestUtils.attachTestOutputStream(combined);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByKeyAndWindow() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testUpdateStateByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
- int out = 0;
- if (state.isPresent()) {
- out = out + state.get();
- }
- for (Integer v : values) {
- out = out + v;
- }
- return Optional.of(out);
- });
-
- JavaTestUtils.attachTestOutputStream(updated);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testReduceByKeyAndWindowWithInverse() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
- new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)),
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
-
- JavaTestUtils.attachTestOutputStream(sorted);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testPairToNormalRDDTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3, 1, 4, 2),
- Arrays.asList(2, 3, 4, 1));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
- JavaTestUtils.attachTestOutputStream(firstParts);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "DODGERS"),
- new Tuple2<>("california", "GIANTS"),
- new Tuple2<>("new york", "YANKEES"),
- new Tuple2<>("new york", "METS")),
- Arrays.asList(new Tuple2<>("california", "SHARKS"),
- new Tuple2<>("california", "DUCKS"),
- new Tuple2<>("new york", "RANGERS"),
- new Tuple2<>("new york", "ISLANDERS")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @Test
- public void testFlatMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers1"),
- new Tuple2<>("california", "dodgers2"),
- new Tuple2<>("california", "giants1"),
- new Tuple2<>("california", "giants2"),
- new Tuple2<>("new york", "yankees1"),
- new Tuple2<>("new york", "yankees2"),
- new Tuple2<>("new york", "mets1"),
- new Tuple2<>("new york", "mets2")),
- Arrays.asList(new Tuple2<>("california", "sharks1"),
- new Tuple2<>("california", "sharks2"),
- new Tuple2<>("california", "ducks1"),
- new Tuple2<>("california", "ducks2"),
- new Tuple2<>("new york", "rangers1"),
- new Tuple2<>("new york", "rangers2"),
- new Tuple2<>("new york", "islanders1"),
- new Tuple2<>("new york", "islanders2")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> flatMapped =
- pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- Assert.assertEquals(expected, result);
- }
-
- /**
- * This test is only for testing the APIs. It's not necessary to run it.
- */
- public void testMapWithStateAPI() {
- JavaPairRDD<String, Boolean> initialRDD = null;
- JavaPairDStream<String, Integer> wordsDstream = null;
-
- JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
-
- JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
-
- JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
-
- JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
- }
-}
diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties
deleted file mode 100644
index eb3b1999eb..0000000000
--- a/extras/java8-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-org.spark-project.jetty.LEVEL=WARN
diff --git a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
deleted file mode 100644
index fa0681db41..0000000000
--- a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Test cases where JDK8-compiled Scala user code is used with Spark.
- */
-class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
- test("basic RDD closure test (SPARK-6152)") {
- sc.parallelize(1 to 1000).map(x => x * x).count()
- }
-}
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
deleted file mode 100644
index d1c38c7ca5..0000000000
--- a/extras/kinesis-asl-assembly/pom.xml
+++ /dev/null
@@ -1,181 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project Kinesis Assembly</name>
- <url>http://spark.apache.org/</url>
-
- <properties>
- <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <!--
- Demote already included in the Spark assembly.
- -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <classifier>${avro.mapred.classifier}</classifier>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resource>log4j.properties</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
-</project>
-
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
deleted file mode 100644
index 935155eb5d..0000000000
--- a/extras/kinesis-asl/pom.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Kinesis Integration</name>
-
- <properties>
- <sbt.project.name>streaming-kinesis-asl</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-client</artifactId>
- <version>${aws.kinesis.client.version}</version>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>amazon-kinesis-producer</artifactId>
- <version>${aws.kinesis.producer.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
deleted file mode 100644
index 5dc825dfdc..0000000000
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.examples.streaming;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import com.amazonaws.regions.RegionUtils;
-import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kinesis.KinesisUtils;
-
-import scala.Tuple2;
-
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-/**
- * Consumes messages from a Amazon Kinesis streams and does wordcount.
- *
- * This example spins up 1 Kinesis Receiver per shard for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given stream.
- *
- * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
- * [app-name] is the name of the consumer app, used to track the read data in DynamoDB
- * [stream-name] name of the Kinesis stream (ie. mySparkStream)
- * [endpoint-url] endpoint of the Kinesis service
- * (e.g. https://kinesis.us-east-1.amazonaws.com)
- *
- *
- * Example:
- * # export AWS keys if necessary
- * $ export AWS_ACCESS_KEY_ID=[your-access-key]
- * $ export AWS_SECRET_KEY=<your-secret-key>
- *
- * # run the example
- * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com
- *
- * There is a companion helper class called KinesisWordProducerASL which puts dummy data
- * onto the Kinesis stream.
- *
- * This code uses the DefaultAWSCredentialsProviderChain to find credentials
- * in the following order:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- * For more information, see
- * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
- *
- * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- * the Kinesis Spark Streaming integration.
- */
-public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
- private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
- private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
-
- public static void main(String[] args) {
- // Check that all required args were passed in.
- if (args.length != 3) {
- System.err.println(
- "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
- " <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
- " <stream-name> is the name of the Kinesis stream\n" +
- " <endpoint-url> is the endpoint of the Kinesis service\n" +
- " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
- "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
- "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
- "details.\n"
- );
- System.exit(1);
- }
-
- // Set default log4j logging level to WARN to hide Spark logs
- StreamingExamples.setStreamingLogLevels();
-
- // Populate the appropriate variables from the given args
- String kinesisAppName = args[0];
- String streamName = args[1];
- String endpointUrl = args[2];
-
- // Create a Kinesis client in order to determine the number of shards for the given stream
- AmazonKinesisClient kinesisClient =
- new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
- kinesisClient.setEndpoint(endpointUrl);
- int numShards =
- kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
-
-
- // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
- // This is not a necessity; if there are less receivers/DStreams than the number of shards,
- // then the shards will be automatically distributed among the receivers and each receiver
- // will receive data from multiple shards.
- int numStreams = numShards;
-
- // Spark Streaming batch interval
- Duration batchInterval = new Duration(2000);
-
- // Kinesis checkpoint interval. Same as batchInterval for this example.
- Duration kinesisCheckpointInterval = batchInterval;
-
- // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
- // DynamoDB of the same region as the Kinesis stream
- String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
-
- // Setup the Spark config and StreamingContext
- SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
-
- // Create the Kinesis DStreams
- List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
- for (int i = 0; i < numStreams; i++) {
- streamsList.add(
- KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
- InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
- );
- }
-
- // Union all the streams if there is more than 1 stream
- JavaDStream<byte[]> unionStreams;
- if (streamsList.size() > 1) {
- unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
- } else {
- // Otherwise, just use the 1 stream
- unionStreams = streamsList.get(0);
- }
-
- // Convert each line of Array[Byte] to String, and split into words
- JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
- @Override
- public Iterator<String> call(byte[] line) {
- String s = new String(line, StandardCharsets.UTF_8);
- return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
- }
- });
-
- // Map each word to a (word, 1) tuple so we can reduce by key to count the words
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }
- ).reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }
- );
-
- // Print the first 10 wordCounts
- wordCounts.print();
-
- // Start the streaming context and await termination
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
deleted file mode 100644
index 51f8c5ca66..0000000000
--- a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
- Consumes messages from a Amazon Kinesis streams and does wordcount.
-
- This example spins up 1 Kinesis Receiver per shard for the given stream.
- It then starts pulling from the last checkpointed sequence number of the given stream.
-
- Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
- <app-name> is the name of the consumer app, used to track the read data in DynamoDB
- <stream-name> name of the Kinesis stream (ie. mySparkStream)
- <endpoint-url> endpoint of the Kinesis service
- (e.g. https://kinesis.us-east-1.amazonaws.com)
-
-
- Example:
- # export AWS keys if necessary
- $ export AWS_ACCESS_KEY_ID=<your-access-key>
- $ export AWS_SECRET_KEY=<your-secret-key>
-
- # run the example
- $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
- spark-streaming-kinesis-asl-assembly_*.jar \
- extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
- myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
-
- There is a companion helper class called KinesisWordProducerASL which puts dummy data
- onto the Kinesis stream.
-
- This code uses the DefaultAWSCredentialsProviderChain to find credentials
- in the following order:
- Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- Java System Properties - aws.accessKeyId and aws.secretKey
- Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- Instance profile credentials - delivered through the Amazon EC2 metadata service
- For more information, see
- http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
-
- See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- the Kinesis Spark Streaming integration.
-"""
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.streaming import StreamingContext
-from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
-
-if __name__ == "__main__":
- if len(sys.argv) != 5:
- print(
- "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
- file=sys.stderr)
- sys.exit(-1)
-
- sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
- ssc = StreamingContext(sc, 1)
- appName, streamName, endpointUrl, regionName = sys.argv[1:]
- lines = KinesisUtils.createStream(
- ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
- counts = lines.flatMap(lambda line: line.split(" ")) \
- .map(lambda word: (word, 1)) \
- .reduceByKey(lambda a, b: a+b)
- counts.pprint()
-
- ssc.start()
- ssc.awaitTermination()
diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties
deleted file mode 100644
index 6cdc9286c5..0000000000
--- a/extras/kinesis-asl/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-log4j.rootCategory=WARN, console
-
-# File appender
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=false
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
-
-# Console appender
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.out
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark-project.jetty=WARN
-log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO \ No newline at end of file
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
deleted file mode 100644
index 6a73bc0e30..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import java.nio.ByteBuffer
-
-import scala.util.Random
-
-import com.amazonaws.auth.{BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.PutRecordRequest
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
-
-
-/**
- * Consumes messages from a Amazon Kinesis streams and does wordcount.
- *
- * This example spins up 1 Kinesis Receiver per shard for the given stream.
- * It then starts pulling from the last checkpointed sequence number of the given stream.
- *
- * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
- * <app-name> is the name of the consumer app, used to track the read data in DynamoDB
- * <stream-name> name of the Kinesis stream (ie. mySparkStream)
- * <endpoint-url> endpoint of the Kinesis service
- * (e.g. https://kinesis.us-east-1.amazonaws.com)
- *
- *
- * Example:
- * # export AWS keys if necessary
- * $ export AWS_ACCESS_KEY_ID=<your-access-key>
- * $ export AWS_SECRET_KEY=<your-secret-key>
- *
- * # run the example
- * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com
- *
- * There is a companion helper class called KinesisWordProducerASL which puts dummy data
- * onto the Kinesis stream.
- *
- * This code uses the DefaultAWSCredentialsProviderChain to find credentials
- * in the following order:
- * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- * Java System Properties - aws.accessKeyId and aws.secretKey
- * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
- * Instance profile credentials - delivered through the Amazon EC2 metadata service
- * For more information, see
- * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
- *
- * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
- * the Kinesis Spark Streaming integration.
- */
-object KinesisWordCountASL extends Logging {
- def main(args: Array[String]) {
- // Check that all required args were passed in.
- if (args.length != 3) {
- System.err.println(
- """
- |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
- |
- | <app-name> is the name of the consumer app, used to track the read data in DynamoDB
- | <stream-name> is the name of the Kinesis stream
- | <endpoint-url> is the endpoint of the Kinesis service
- | (e.g. https://kinesis.us-east-1.amazonaws.com)
- |
- |Generate input data for Kinesis stream using the example KinesisWordProducerASL.
- |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
- |details.
- """.stripMargin)
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- // Populate the appropriate variables from the given args
- val Array(appName, streamName, endpointUrl) = args
-
-
- // Determine the number of shards from the stream using the low-level Kinesis Client
- // from the AWS Java SDK.
- val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
- require(credentials != null,
- "No AWS credentials found. Please specify credentials using one of the methods specified " +
- "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
- val kinesisClient = new AmazonKinesisClient(credentials)
- kinesisClient.setEndpoint(endpointUrl)
- val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
-
-
- // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
- // This is not a necessity; if there are less receivers/DStreams than the number of shards,
- // then the shards will be automatically distributed among the receivers and each receiver
- // will receive data from multiple shards.
- val numStreams = numShards
-
- // Spark Streaming batch interval
- val batchInterval = Milliseconds(2000)
-
- // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
- // on sequence number of records that have been received. Same as batchInterval for this
- // example.
- val kinesisCheckpointInterval = batchInterval
-
- // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
- // DynamoDB of the same region as the Kinesis stream
- val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
-
- // Setup the SparkConfig and StreamingContext
- val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
- val ssc = new StreamingContext(sparkConfig, batchInterval)
-
- // Create the Kinesis DStreams
- val kinesisStreams = (0 until numStreams).map { i =>
- KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
- InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
- }
-
- // Union all the streams
- val unionStreams = ssc.union(kinesisStreams)
-
- // Convert each line of Array[Byte] to String, and split into words
- val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
-
- // Map each word to a (word, 1) tuple so we can reduce by key to count the words
- val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
-
- // Print the first 10 wordCounts
- wordCounts.print()
-
- // Start the streaming context and await termination
- ssc.start()
- ssc.awaitTermination()
- }
-}
-
-/**
- * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
- * <records-per-sec> <words-per-record>
- *
- * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
- * <endpoint-url> is the endpoint of the Kinesis service
- * (ie. https://kinesis.us-east-1.amazonaws.com)
- * <records-per-sec> is the rate of records per second to put onto the stream
- * <words-per-record> is the rate of records per second to put onto the stream
- *
- * Example:
- * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
- * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
- */
-object KinesisWordProducerASL {
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println(
- """
- |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec>
- <words-per-record>
- |
- | <stream-name> is the name of the Kinesis stream
- | <endpoint-url> is the endpoint of the Kinesis service
- | (e.g. https://kinesis.us-east-1.amazonaws.com)
- | <records-per-sec> is the rate of records per second to put onto the stream
- | <words-per-record> is the rate of records per second to put onto the stream
- |
- """.stripMargin)
-
- System.exit(1)
- }
-
- // Set default log4j logging level to WARN to hide Spark logs
- StreamingExamples.setStreamingLogLevels()
-
- // Populate the appropriate variables from the given args
- val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
-
- // Generate the records and return the totals
- val totals = generate(stream, endpoint, recordsPerSecond.toInt,
- wordsPerRecord.toInt)
-
- // Print the array of (word, total) tuples
- println("Totals for the words sent")
- totals.foreach(println(_))
- }
-
- def generate(stream: String,
- endpoint: String,
- recordsPerSecond: Int,
- wordsPerRecord: Int): Seq[(String, Int)] = {
-
- val randomWords = List("spark", "you", "are", "my", "father")
- val totals = scala.collection.mutable.Map[String, Int]()
-
- // Create the low-level Kinesis Client from the AWS Java SDK.
- val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
- kinesisClient.setEndpoint(endpoint)
-
- println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
- s" $recordsPerSecond records per second and $wordsPerRecord words per record")
-
- // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
- for (i <- 1 to 10) {
- // Generate recordsPerSec records to put onto the stream
- val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
- // Randomly generate wordsPerRecord number of words
- val data = (1 to wordsPerRecord.toInt).map(x => {
- // Get a random index to a word
- val randomWordIdx = Random.nextInt(randomWords.size)
- val randomWord = randomWords(randomWordIdx)
-
- // Increment total count to compare to server counts later
- totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
-
- randomWord
- }).mkString(" ")
-
- // Create a partitionKey based on recordNum
- val partitionKey = s"partitionKey-$recordNum"
-
- // Create a PutRecordRequest with an Array[Byte] version of the data
- val putRecordRequest = new PutRecordRequest().withStreamName(stream)
- .withPartitionKey(partitionKey)
- .withData(ByteBuffer.wrap(data.getBytes()))
-
- // Put the record onto the stream and capture the PutRecordResult
- val putRecordResult = kinesisClient.putRecord(putRecordRequest)
- }
-
- // Sleep for a second
- Thread.sleep(1000)
- println("Sent " + recordsPerSecond + " records")
- }
- // Convert the totals to (index, total) tuple
- totals.toSeq.sortBy(_._1)
- }
-}
-
-/**
- * Utility functions for Spark Streaming examples.
- * This has been lifted from the examples/ project to remove the circular dependency.
- */
-private[streaming] object StreamingExamples extends Logging {
- // Set reasonable logging levels for streaming if the user has not configured log4j.
- def setStreamingLogLevels() {
- val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
- if (!log4jInitialized) {
- // We first log something to initialize Spark's default logging, then we override the
- // logging level.
- logInfo("Setting log level to [WARN] for streaming example." +
- " To override add a custom log4j.properties to the classpath.")
- Logger.getRootLogger.setLevel(Level.WARN)
- }
- }
-}
-// scalastyle:on println
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
deleted file mode 100644
index 3996f168e6..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
-import com.amazonaws.services.kinesis.model._
-
-import org.apache.spark._
-import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
-import org.apache.spark.storage.BlockId
-import org.apache.spark.util.NextIterator
-
-
-/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
-private[kinesis]
-case class SequenceNumberRange(
- streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
-
-/** Class representing an array of Kinesis sequence number ranges */
-private[kinesis]
-case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
- def isEmpty(): Boolean = ranges.isEmpty
-
- def nonEmpty(): Boolean = ranges.nonEmpty
-
- override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
-}
-
-private[kinesis]
-object SequenceNumberRanges {
- def apply(range: SequenceNumberRange): SequenceNumberRanges = {
- new SequenceNumberRanges(Seq(range))
- }
-}
-
-
-/** Partition storing the information of the ranges of Kinesis sequence numbers to read */
-private[kinesis]
-class KinesisBackedBlockRDDPartition(
- idx: Int,
- blockId: BlockId,
- val isBlockIdValid: Boolean,
- val seqNumberRanges: SequenceNumberRanges
- ) extends BlockRDDPartition(blockId, idx)
-
-/**
- * A BlockRDD where the block data is backed by Kinesis, which can accessed using the
- * sequence numbers of the corresponding blocks.
- */
-private[kinesis]
-class KinesisBackedBlockRDD[T: ClassTag](
- sc: SparkContext,
- val regionName: String,
- val endpointUrl: String,
- @transient private val _blockIds: Array[BlockId],
- @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
- @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
- val retryTimeoutMs: Int = 10000,
- val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
- val awsCredentialsOption: Option[SerializableAWSCredentials] = None
- ) extends BlockRDD[T](sc, _blockIds) {
-
- require(_blockIds.length == arrayOfseqNumberRanges.length,
- "Number of blockIds is not equal to the number of sequence number ranges")
-
- override def isValid(): Boolean = true
-
- override def getPartitions: Array[Partition] = {
- Array.tabulate(_blockIds.length) { i =>
- val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
- new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i))
- }
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- val blockManager = SparkEnv.get.blockManager
- val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition]
- val blockId = partition.blockId
-
- def getBlockFromBlockManager(): Option[Iterator[T]] = {
- logDebug(s"Read partition data of $this from block manager, block $blockId")
- blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
- }
-
- def getBlockFromKinesis(): Iterator[T] = {
- val credentials = awsCredentialsOption.getOrElse {
- new DefaultAWSCredentialsProviderChain().getCredentials()
- }
- partition.seqNumberRanges.ranges.iterator.flatMap { range =>
- new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
- range, retryTimeoutMs).map(messageHandler)
- }
- }
- if (partition.isBlockIdValid) {
- getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
- } else {
- getBlockFromKinesis()
- }
- }
-}
-
-
-/**
- * An iterator that return the Kinesis data based on the given range of sequence numbers.
- * Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber,
- * until the endSequenceNumber is reached.
- */
-private[kinesis]
-class KinesisSequenceRangeIterator(
- credentials: AWSCredentials,
- endpointUrl: String,
- regionId: String,
- range: SequenceNumberRange,
- retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
-
- private val client = new AmazonKinesisClient(credentials)
- private val streamName = range.streamName
- private val shardId = range.shardId
-
- private var toSeqNumberReceived = false
- private var lastSeqNumber: String = null
- private var internalIterator: Iterator[Record] = null
-
- client.setEndpoint(endpointUrl, "kinesis", regionId)
-
- override protected def getNext(): Record = {
- var nextRecord: Record = null
- if (toSeqNumberReceived) {
- finished = true
- } else {
-
- if (internalIterator == null) {
-
- // If the internal iterator has not been initialized,
- // then fetch records from starting sequence number
- internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
- } else if (!internalIterator.hasNext) {
-
- // If the internal iterator does not have any more records,
- // then fetch more records after the last consumed sequence number
- internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
- }
-
- if (!internalIterator.hasNext) {
-
- // If the internal iterator still does not have any data, then throw exception
- // and terminate this iterator
- finished = true
- throw new SparkException(
- s"Could not read until the end sequence number of the range: $range")
- } else {
-
- // Get the record, copy the data into a byte array and remember its sequence number
- nextRecord = internalIterator.next()
- lastSeqNumber = nextRecord.getSequenceNumber()
-
- // If the this record's sequence number matches the stopping sequence number, then make sure
- // the iterator is marked finished next time getNext() is called
- if (nextRecord.getSequenceNumber == range.toSeqNumber) {
- toSeqNumberReceived = true
- }
- }
- }
- nextRecord
- }
-
- override protected def close(): Unit = {
- client.shutdown()
- }
-
- /**
- * Get records starting from or after the given sequence number.
- */
- private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
- val shardIterator = getKinesisIterator(iteratorType, seqNum)
- val result = getRecordsAndNextKinesisIterator(shardIterator)
- result._1
- }
-
- /**
- * Get the records starting from using a Kinesis shard iterator (which is a progress handle
- * to get records from Kinesis), and get the next shard iterator for next consumption.
- */
- private def getRecordsAndNextKinesisIterator(
- shardIterator: String): (Iterator[Record], String) = {
- val getRecordsRequest = new GetRecordsRequest
- getRecordsRequest.setRequestCredentials(credentials)
- getRecordsRequest.setShardIterator(shardIterator)
- val getRecordsResult = retryOrTimeout[GetRecordsResult](
- s"getting records using shard iterator") {
- client.getRecords(getRecordsRequest)
- }
- // De-aggregate records, if KPL was used in producing the records. The KCL automatically
- // handles de-aggregation during regular operation. This code path is used during recovery
- val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
- (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
- }
-
- /**
- * Get the Kinesis shard iterator for getting records starting from or after the given
- * sequence number.
- */
- private def getKinesisIterator(
- iteratorType: ShardIteratorType,
- sequenceNumber: String): String = {
- val getShardIteratorRequest = new GetShardIteratorRequest
- getShardIteratorRequest.setRequestCredentials(credentials)
- getShardIteratorRequest.setStreamName(streamName)
- getShardIteratorRequest.setShardId(shardId)
- getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
- getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
- val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
- s"getting shard iterator from sequence number $sequenceNumber") {
- client.getShardIterator(getShardIteratorRequest)
- }
- getShardIteratorResult.getShardIterator
- }
-
- /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
- private def retryOrTimeout[T](message: String)(body: => T): T = {
- import KinesisSequenceRangeIterator._
-
- var startTimeMs = System.currentTimeMillis()
- var retryCount = 0
- var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
- var result: Option[T] = None
- var lastError: Throwable = null
-
- def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
- def isMaxRetryDone = retryCount >= MAX_RETRIES
-
- while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
- if (retryCount > 0) { // wait only if this is a retry
- Thread.sleep(waitTimeMs)
- waitTimeMs *= 2 // if you have waited, then double wait time for next round
- }
- try {
- result = Some(body)
- } catch {
- case NonFatal(t) =>
- lastError = t
- t match {
- case ptee: ProvisionedThroughputExceededException =>
- logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee)
- case e: Throwable =>
- throw new SparkException(s"Error while $message", e)
- }
- }
- retryCount += 1
- }
- result.getOrElse {
- if (isTimedOut) {
- throw new SparkException(
- s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
- } else {
- throw new SparkException(
- s"Gave up after $retryCount retries while $message, last exception: ", lastError)
- }
- }
- }
-}
-
-private[streaming]
-object KinesisSequenceRangeIterator {
- val MAX_RETRIES = 3
- val MIN_RETRY_WAIT_TIME_MS = 100
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
deleted file mode 100644
index 1ca6d4302c..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.concurrent._
-
-import scala.util.control.NonFatal
-
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
-
-/**
- * This is a helper class for managing Kinesis checkpointing.
- *
- * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint
- * @param checkpointInterval How frequently we will checkpoint to DynamoDB
- * @param workerId Worker Id of KCL worker for logging purposes
- * @param clock In order to use ManualClocks for the purpose of testing
- */
-private[kinesis] class KinesisCheckpointer(
- receiver: KinesisReceiver[_],
- checkpointInterval: Duration,
- workerId: String,
- clock: Clock = new SystemClock) extends Logging {
-
- // a map from shardId's to checkpointers
- private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]()
-
- private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]()
-
- private val checkpointerThread: RecurringTimer = startCheckpointerThread()
-
- /** Update the checkpointer instance to the most recent one for the given shardId. */
- def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
- checkpointers.put(shardId, checkpointer)
- }
-
- /**
- * Stop tracking the specified shardId.
- *
- * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]],
- * we will use that to make the final checkpoint. If `null` is provided, we will not make the
- * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
- */
- def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
- synchronized {
- checkpointers.remove(shardId)
- checkpoint(shardId, checkpointer)
- }
- }
-
- /** Perform the checkpoint. */
- private def checkpoint(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
- try {
- if (checkpointer != null) {
- receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
- val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
- // Kinesis sequence numbers are monotonically increasing strings, therefore we can do
- // safely do the string comparison
- if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
- /* Perform the checkpoint */
- KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
- logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" +
- s" $latestSeqNum for shardId $shardId")
- lastCheckpointedSeqNums.put(shardId, latestSeqNum)
- }
- }
- } else {
- logDebug(s"Checkpointing skipped for shardId $shardId. Checkpointer not set.")
- }
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
- }
- }
-
- /** Checkpoint the latest saved sequence numbers for all active shardId's. */
- private def checkpointAll(): Unit = synchronized {
- // if this method throws an exception, then the scheduled task will not run again
- try {
- val shardIds = checkpointers.keys()
- while (shardIds.hasMoreElements) {
- val shardId = shardIds.nextElement()
- checkpoint(shardId, checkpointers.get(shardId))
- }
- } catch {
- case NonFatal(e) =>
- logWarning("Failed to checkpoint to DynamoDB.", e)
- }
- }
-
- /**
- * Start the checkpointer thread with the given checkpoint duration.
- */
- private def startCheckpointerThread(): RecurringTimer = {
- val period = checkpointInterval.milliseconds
- val threadName = s"Kinesis Checkpointer - Worker $workerId"
- val timer = new RecurringTimer(clock, period, _ => checkpointAll(), threadName)
- timer.start()
- logDebug(s"Started checkpointer thread: $threadName")
- timer
- }
-
- /**
- * Shutdown the checkpointer. Should be called on the onStop of the Receiver.
- */
- def shutdown(): Unit = {
- // the recurring timer checkpoints for us one last time.
- checkpointerThread.stop(interruptTimer = false)
- checkpointers.clear()
- lastCheckpointedSeqNums.clear()
- logInfo("Successfully shutdown Kinesis Checkpointer.")
- }
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
deleted file mode 100644
index 5223c81a8e..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.reflect.ClassTag
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.{Duration, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
-
-private[kinesis] class KinesisInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointAppName: String,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: Record => T,
- awsCredentialsOption: Option[SerializableAWSCredentials]
- ) extends ReceiverInputDStream[T](_ssc) {
-
- private[streaming]
- override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
-
- // This returns true even for when blockInfos is empty
- val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
-
- if (allBlocksHaveRanges) {
- // Create a KinesisBackedBlockRDD, even when there are no blocks
- val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
- val seqNumRanges = blockInfos.map {
- _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
- val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
- logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
- s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
- new KinesisBackedBlockRDD(
- context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
- isBlockIdValid = isBlockIdValid,
- retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
- messageHandler = messageHandler,
- awsCredentialsOption = awsCredentialsOption)
- } else {
- logWarning("Kinesis sequence number information was not present with some block metadata," +
- " it may not be possible to recover from failures")
- super.createBlockRDD(time, blockInfos)
- }
- }
-
- override def getReceiver(): Receiver[T] = {
- new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
- checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
- }
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
deleted file mode 100644
index 48ee2a9597..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.Utils
-import org.apache.spark.Logging
-
-private[kinesis]
-case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
- extends AWSCredentials {
- override def getAWSAccessKeyId: String = accessKeyId
- override def getAWSSecretKey: String = secretKey
-}
-
-/**
- * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
- * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
- * https://github.com/awslabs/amazon-kinesis-client
- *
- * The way this Receiver works is as follows:
- *
- * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
- * KinesisRecordProcessor
- * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
- * inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
- * - When the block generator defines a block, then the recorded sequence number ranges that were
- * inserted into the block are recorded separately for being used later.
- * - When the block is ready to be pushed, the block is pushed and the ranges are reported as
- * metadata of the block. In addition, the ranges are used to find out the latest sequence
- * number for each shard that can be checkpointed through the DynamoDB.
- * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
- * number for it own shard.
- *
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Region name used by the Kinesis Client Library for
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
- * by the Kinesis Client Library. If you change the App name or Stream name,
- * the KCL will throw errors. This usually requires deleting the backing
- * DynamoDB table with the same name this Kinesis application.
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects
- * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
- * the credentials
- */
-private[kinesis] class KinesisReceiver[T](
- val streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointAppName: String,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: Record => T,
- awsCredentialsOption: Option[SerializableAWSCredentials])
- extends Receiver[T](storageLevel) with Logging { receiver =>
-
- /*
- * =================================================================================
- * The following vars are initialize in the onStart() method which executes in the
- * Spark worker after this Receiver is serialized and shipped to the worker.
- * =================================================================================
- */
-
- /**
- * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
- * where this code runs (not the driver's IP address.)
- */
- @volatile private var workerId: String = null
-
- /**
- * Worker is the core client abstraction from the Kinesis Client Library (KCL).
- * A worker can process more than one shards from the given stream.
- * Each shard is assigned its own IRecordProcessor and the worker run multiple such
- * processors.
- */
- @volatile private var worker: Worker = null
- @volatile private var workerThread: Thread = null
-
- /** BlockGenerator used to generates blocks out of Kinesis data */
- @volatile private var blockGenerator: BlockGenerator = null
-
- /**
- * Sequence number ranges added to the current block being generated.
- * Accessing and updating of this map is synchronized by locks in BlockGenerator.
- */
- private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
-
- /** Sequence number ranges of data added to each generated block */
- private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
-
- /**
- * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
- */
- @volatile private var kinesisCheckpointer: KinesisCheckpointer = null
-
- /**
- * Latest sequence number ranges that have been stored successfully.
- * This is used for checkpointing through KCL */
- private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
-
- /**
- * This is called when the KinesisReceiver starts and must be non-blocking.
- * The KCL creates and manages the receiving/processing thread pool through Worker.run().
- */
- override def onStart() {
- blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
-
- workerId = Utils.localHostName() + ":" + UUID.randomUUID()
-
- kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
- // KCL config instance
- val awsCredProvider = resolveAWSCredentialsProvider()
- val kinesisClientLibConfiguration =
- new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
- .withKinesisEndpoint(endpointUrl)
- .withInitialPositionInStream(initialPositionInStream)
- .withTaskBackoffTimeMillis(500)
- .withRegionName(regionName)
-
- /*
- * RecordProcessorFactory creates impls of IRecordProcessor.
- * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
- * IRecordProcessor.processRecords() method.
- * We're using our custom KinesisRecordProcessor in this case.
- */
- val recordProcessorFactory = new IRecordProcessorFactory {
- override def createProcessor: IRecordProcessor =
- new KinesisRecordProcessor(receiver, workerId)
- }
-
- worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
- workerThread = new Thread() {
- override def run(): Unit = {
- try {
- worker.run()
- } catch {
- case NonFatal(e) =>
- restart("Error running the KCL worker in Receiver", e)
- }
- }
- }
-
- blockIdToSeqNumRanges.clear()
- blockGenerator.start()
-
- workerThread.setName(s"Kinesis Receiver ${streamId}")
- workerThread.setDaemon(true)
- workerThread.start()
-
- logInfo(s"Started receiver with workerId $workerId")
- }
-
- /**
- * This is called when the KinesisReceiver stops.
- * The KCL worker.shutdown() method stops the receiving/processing threads.
- * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
- */
- override def onStop() {
- if (workerThread != null) {
- if (worker != null) {
- worker.shutdown()
- worker = null
- }
- workerThread.join()
- workerThread = null
- logInfo(s"Stopped receiver for workerId $workerId")
- }
- workerId = null
- if (kinesisCheckpointer != null) {
- kinesisCheckpointer.shutdown()
- kinesisCheckpointer = null
- }
- }
-
- /** Add records of the given shard to the current block being generated */
- private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
- if (records.size > 0) {
- val dataIterator = records.iterator().asScala.map(messageHandler)
- val metadata = SequenceNumberRange(streamName, shardId,
- records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
- blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
- }
- }
-
- /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
- private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
- Option(shardIdToLatestStoredSeqNum.get(shardId))
- }
-
- /**
- * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the
- * given shardId.
- */
- def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
- assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
- kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
- }
-
- /**
- * Remove the checkpointer for the given shardId. The provided checkpointer will be used to
- * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not
- * checkpoint.
- */
- def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
- assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
- kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
- }
-
- /**
- * Remember the range of sequence numbers that was added to the currently active block.
- * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
- */
- private def rememberAddedRange(range: SequenceNumberRange): Unit = {
- seqNumRangesInCurrentBlock += range
- }
-
- /**
- * Finalize the ranges added to the block that was active and prepare the ranges buffer
- * for next block. Internally, this is synchronized with `rememberAddedRange()`.
- */
- private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
- blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
- seqNumRangesInCurrentBlock.clear()
- logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
- }
-
- /** Store the block along with its associated ranges */
- private def storeBlockWithRanges(
- blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
- val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
- if (rangesToReportOption.isEmpty) {
- stop("Error while storing block into Spark, could not find sequence number ranges " +
- s"for block $blockId")
- return
- }
-
- val rangesToReport = rangesToReportOption.get
- var attempt = 0
- var stored = false
- var throwable: Throwable = null
- while (!stored && attempt <= 3) {
- try {
- store(arrayBuffer, rangesToReport)
- stored = true
- } catch {
- case NonFatal(th) =>
- attempt += 1
- throwable = th
- }
- }
- if (!stored) {
- stop("Error while storing block into Spark", throwable)
- }
-
- // Update the latest sequence number that have been successfully stored for each shard
- // Note that we are doing this sequentially because the array of sequence number ranges
- // is assumed to be
- rangesToReport.ranges.foreach { range =>
- shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
- }
- }
-
- /**
- * If AWS credential is provided, return a AWSCredentialProvider returning that credential.
- * Otherwise, return the DefaultAWSCredentialsProviderChain.
- */
- private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
- awsCredentialsOption match {
- case Some(awsCredentials) =>
- logInfo("Using provided AWS credentials")
- new AWSCredentialsProvider {
- override def getCredentials: AWSCredentials = awsCredentials
- override def refresh(): Unit = { }
- }
- case None =>
- logInfo("Using DefaultAWSCredentialsProviderChain")
- new DefaultAWSCredentialsProviderChain()
- }
- }
-
-
- /**
- * Class to handle blocks generated by this receiver's block generator. Specifically, in
- * the context of the Kinesis Receiver, this handler does the following.
- *
- * - When an array of records is added to the current active block in the block generator,
- * this handler keeps track of the corresponding sequence number range.
- * - When the currently active block is ready to sealed (not more records), this handler
- * keep track of the list of ranges added into this block in another H
- */
- private class GeneratedBlockHandler extends BlockGeneratorListener {
-
- /**
- * Callback method called after a data item is added into the BlockGenerator.
- * The data addition, block generation, and calls to onAddData and onGenerateBlock
- * are all synchronized through the same lock.
- */
- def onAddData(data: Any, metadata: Any): Unit = {
- rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
- }
-
- /**
- * Callback method called after a block has been generated.
- * The data addition, block generation, and calls to onAddData and onGenerateBlock
- * are all synchronized through the same lock.
- */
- def onGenerateBlock(blockId: StreamBlockId): Unit = {
- finalizeRangesForCurrentBlock(blockId)
- }
-
- /** Callback method called when a block is ready to be pushed / stored. */
- def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- storeBlockWithRanges(blockId,
- arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
- }
-
- /** Callback called in case of any error in internal of the BlockGenerator */
- def onError(message: String, throwable: Throwable): Unit = {
- reportError(message, throwable)
- }
- }
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
deleted file mode 100644
index b5b76cb92d..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.util.List
-
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.Duration
-
-/**
- * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
- * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
- * shard in the Kinesis stream upon startup. This is normally done in separate threads,
- * but the KCLs within the KinesisReceivers will balance themselves out if you create
- * multiple Receivers.
- *
- * @param receiver Kinesis receiver
- * @param workerId for logging purposes
- */
-private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String)
- extends IRecordProcessor with Logging {
-
- // shardId populated during initialize()
- @volatile
- private var shardId: String = _
-
- /**
- * The Kinesis Client Library calls this method during IRecordProcessor initialization.
- *
- * @param shardId assigned by the KCL to this particular RecordProcessor.
- */
- override def initialize(shardId: String) {
- this.shardId = shardId
- logInfo(s"Initialized workerId $workerId with shardId $shardId")
- }
-
- /**
- * This method is called by the KCL when a batch of records is pulled from the Kinesis stream.
- * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords()
- * and Spark Streaming's Receiver.store().
- *
- * @param batch list of records from the Kinesis stream shard
- * @param checkpointer used to update Kinesis when this batch has been processed/stored
- * in the DStream
- */
- override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
- if (!receiver.isStopped()) {
- try {
- receiver.addRecords(shardId, batch)
- logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
- receiver.setCheckpointer(shardId, checkpointer)
- } catch {
- case NonFatal(e) => {
- /*
- * If there is a failure within the batch, the batch will not be checkpointed.
- * This will potentially cause records since the last checkpoint to be processed
- * more than once.
- */
- logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
- s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
-
- /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
- throw e
- }
- }
- } else {
- /* RecordProcessor has been stopped. */
- logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" +
- s" and shardId $shardId. No more records will be processed.")
- }
- }
-
- /**
- * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
- * 1) the stream is resharding by splitting or merging adjacent shards
- * (ShutdownReason.TERMINATE)
- * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
- * (ShutdownReason.ZOMBIE)
- *
- * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
- * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
- */
- override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
- logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
- reason match {
- /*
- * TERMINATE Use Case. Checkpoint.
- * Checkpoint to indicate that all records from the shard have been drained and processed.
- * It's now OK to read from the new shards that resulted from a resharding event.
- */
- case ShutdownReason.TERMINATE =>
- receiver.removeCheckpointer(shardId, checkpointer)
-
- /*
- * ZOMBIE Use Case or Unknown reason. NoOp.
- * No checkpoint because other workers may have taken over and already started processing
- * the same records.
- * This may lead to records being processed more than once.
- */
- case _ =>
- receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint
- }
-
- }
-}
-
-private[kinesis] object KinesisRecordProcessor extends Logging {
- /**
- * Retry the given amount of times with a random backoff time (millis) less than the
- * given maxBackOffMillis
- *
- * @param expression expression to evalute
- * @param numRetriesLeft number of retries left
- * @param maxBackOffMillis: max millis between retries
- *
- * @return evaluation of the given expression
- * @throws Unretryable exception, unexpected exception,
- * or any exception that persists after numRetriesLeft reaches 0
- */
- @annotation.tailrec
- def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = {
- util.Try { expression } match {
- /* If the function succeeded, evaluate to x. */
- case util.Success(x) => x
- /* If the function failed, either retry or throw the exception */
- case util.Failure(e) => e match {
- /* Retry: Throttling or other Retryable exception has occurred */
- case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
- => {
- val backOffMillis = Random.nextInt(maxBackOffMillis)
- Thread.sleep(backOffMillis)
- logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
- retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
- }
- /* Throw: Shutdown has been requested by the Kinesis Client Library. */
- case _: ShutdownException => {
- logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
- throw e
- }
- /* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
- case _: InvalidStateException => {
- logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
- s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
- throw e
- }
- /* Throw: Unexpected exception has occurred */
- case _ => {
- logError(s"Unexpected, non-retryable exception.", e)
- throw e
- }
- }
- }
- }
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
deleted file mode 100644
index 0ace453ee9..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Random, Success, Try}
-
-import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
-import com.amazonaws.services.dynamodbv2.document.DynamoDB
-import com.amazonaws.services.kinesis.AmazonKinesisClient
-import com.amazonaws.services.kinesis.model._
-
-import org.apache.spark.Logging
-
-/**
- * Shared utility methods for performing Kinesis tests that actually transfer data.
- *
- * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
- */
-private[kinesis] class KinesisTestUtils extends Logging {
-
- val endpointUrl = KinesisTestUtils.endpointUrl
- val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
- val streamShardCount = 2
-
- private val createStreamTimeoutSeconds = 300
- private val describeStreamPollTimeSeconds = 1
-
- @volatile
- private var streamCreated = false
-
- @volatile
- private var _streamName: String = _
-
- protected lazy val kinesisClient = {
- val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
- client.setEndpoint(endpointUrl)
- client
- }
-
- private lazy val dynamoDB = {
- val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
- dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
- new DynamoDB(dynamoDBClient)
- }
-
- protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
- if (!aggregate) {
- new SimpleDataGenerator(kinesisClient)
- } else {
- throw new UnsupportedOperationException("Aggregation is not supported through this code path")
- }
- }
-
- def streamName: String = {
- require(streamCreated, "Stream not yet created, call createStream() to create one")
- _streamName
- }
-
- def createStream(): Unit = {
- require(!streamCreated, "Stream already created")
- _streamName = findNonExistentStreamName()
-
- // Create a stream. The number of shards determines the provisioned throughput.
- logInfo(s"Creating stream ${_streamName}")
- val createStreamRequest = new CreateStreamRequest()
- createStreamRequest.setStreamName(_streamName)
- createStreamRequest.setShardCount(2)
- kinesisClient.createStream(createStreamRequest)
-
- // The stream is now being created. Wait for it to become active.
- waitForStreamToBeActive(_streamName)
- streamCreated = true
- logInfo(s"Created stream ${_streamName}")
- }
-
- /**
- * Push data to Kinesis stream and return a map of
- * shardId -> seq of (data, seq number) pushed to corresponding shard
- */
- def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
- require(streamCreated, "Stream not yet created, call createStream() to create one")
- val producer = getProducer(aggregate)
- val shardIdToSeqNumbers = producer.sendData(streamName, testData)
- logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
- shardIdToSeqNumbers.toMap
- }
-
- /**
- * Expose a Python friendly API.
- */
- def pushData(testData: java.util.List[Int]): Unit = {
- pushData(testData.asScala, aggregate = false)
- }
-
- def deleteStream(): Unit = {
- try {
- if (streamCreated) {
- kinesisClient.deleteStream(streamName)
- }
- } catch {
- case e: Exception =>
- logWarning(s"Could not delete stream $streamName")
- }
- }
-
- def deleteDynamoDBTable(tableName: String): Unit = {
- try {
- val table = dynamoDB.getTable(tableName)
- table.delete()
- table.waitForDelete()
- } catch {
- case e: Exception =>
- logWarning(s"Could not delete DynamoDB table $tableName")
- }
- }
-
- private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
- try {
- val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
- val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
- Some(desc)
- } catch {
- case rnfe: ResourceNotFoundException =>
- None
- }
- }
-
- private def findNonExistentStreamName(): String = {
- var testStreamName: String = null
- do {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
- } while (describeStream(testStreamName).nonEmpty)
- testStreamName
- }
-
- private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
- val startTime = System.currentTimeMillis()
- val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
- while (System.currentTimeMillis() < endTime) {
- Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
- describeStream(streamNameToWaitFor).foreach { description =>
- val streamStatus = description.getStreamStatus()
- logDebug(s"\t- current state: $streamStatus\n")
- if ("ACTIVE".equals(streamStatus)) {
- return
- }
- }
- }
- require(false, s"Stream $streamName never became active")
- }
-}
-
-private[kinesis] object KinesisTestUtils {
-
- val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
- val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
- val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
-
- lazy val shouldRunTests = {
- val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
- if (isEnvSet) {
- // scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
- println(
- s"""
- |Kinesis tests that actually send data has been enabled by setting the environment
- |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
- |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
- |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
- |To change this endpoint URL to a different region, you can set the environment variable
- |$endVarNameForEndpoint to the desired endpoint URL
- |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
- """.stripMargin)
- // scalastyle:on println
- }
- isEnvSet
- }
-
- lazy val endpointUrl = {
- val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
- // scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
- println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
- // scalastyle:on println
- url
- }
-
- def isAWSCredentialsPresent: Boolean = {
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
- }
-
- def getAWSCredentials(): AWSCredentials = {
- assert(shouldRunTests,
- "Kinesis test not enabled, should not attempt to get AWS credentials")
- Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
- case Success(cred) => cred
- case Failure(e) =>
- throw new Exception(
- s"""
- |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
- |but could not find AWS credentials. Please follow instructions in AWS documentation
- |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
- |can find the credentials.
- """.stripMargin)
- }
- }
-}
-
-/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
-private[kinesis] trait KinesisDataGenerator {
- /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */
- def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
-}
-
-private[kinesis] class SimpleDataGenerator(
- client: AmazonKinesisClient) extends KinesisDataGenerator {
- override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
- val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
- data.foreach { num =>
- val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
- val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
- .withData(data)
- .withPartitionKey(str)
-
- val putRecordResult = client.putRecord(putRecordRequest)
- val shardId = putRecordResult.getShardId
- val seqNumber = putRecordResult.getSequenceNumber()
- val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
- new ArrayBuffer[(Int, String)]())
- sentSeqNumbers += ((num, seqNumber))
- }
-
- shardIdToSeqNumbers.toMap
- }
-}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
deleted file mode 100644
index 15ac588b82..0000000000
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import scala.reflect.ClassTag
-
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, StreamingContext}
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object KinesisUtils {
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets the AWS credentials.
- *
- * @param ssc StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param messageHandler A custom message handler that can generate a generic output from a
- * Kinesis `Record`, which contains both message data, and metadata.
- */
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: Record => T): ReceiverInputDStream[T] = {
- val cleanedHandler = ssc.sc.clean(messageHandler)
- // Setting scope to override receiver stream's scope of "receiver stream"
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
- cleanedHandler, None)
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
- *
- * @param ssc StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param messageHandler A custom message handler that can generate a generic output from a
- * Kinesis `Record`, which contains both message data, and metadata.
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- */
- // scalastyle:off
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: Record => T,
- awsAccessKeyId: String,
- awsSecretKey: String): ReceiverInputDStream[T] = {
- // scalastyle:on
- val cleanedHandler = ssc.sc.clean(messageHandler)
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
- cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets the AWS credentials.
- *
- * @param ssc StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- def createStream(
- ssc: StreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
- // Setting scope to override receiver stream's scope of "receiver stream"
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
- defaultMessageHandler, None)
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
- *
- * @param ssc StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- */
- def createStream(
- ssc: StreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- awsAccessKeyId: String,
- awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = {
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
- initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
- defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- *
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
- * in [[org.apache.spark.SparkConf]].
- *
- * @param ssc StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- ssc: StreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): ReceiverInputDStream[Array[Byte]] = {
- ssc.withNamedScope("kinesis stream") {
- new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
- getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
- checkpointInterval, storageLevel, defaultMessageHandler, None)
- }
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets the AWS credentials.
- *
- * @param jssc Java StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param messageHandler A custom message handler that can generate a generic output from a
- * Kinesis `Record`, which contains both message data, and metadata.
- * @param recordClass Class of the records in DStream
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: JFunction[Record, T],
- recordClass: Class[T]): JavaReceiverInputDStream[T] = {
- implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
- val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
- createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
- initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler)
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
- *
- * @param jssc Java StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param messageHandler A custom message handler that can generate a generic output from a
- * Kinesis `Record`, which contains both message data, and metadata.
- * @param recordClass Class of the records in DStream
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- */
- // scalastyle:off
- def createStream[T](
- jssc: JavaStreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- messageHandler: JFunction[Record, T],
- recordClass: Class[T],
- awsAccessKeyId: String,
- awsSecretKey: String): JavaReceiverInputDStream[T] = {
- // scalastyle:on
- implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
- val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
- createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
- initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler,
- awsAccessKeyId, awsSecretKey)
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets the AWS credentials.
- *
- * @param jssc Java StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- def createStream(
- jssc: JavaStreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Array[Byte]] = {
- createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
- initialPositionInStream, checkpointInterval, storageLevel, defaultMessageHandler(_))
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
- *
- * @param jssc Java StreamingContext object
- * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
- * (KCL) to update DynamoDB
- * @param streamName Kinesis stream name
- * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
- * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param storageLevel Storage level to use for storing the received objects.
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- */
- def createStream(
- jssc: JavaStreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: InitialPositionInStream,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- awsAccessKeyId: String,
- awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = {
- createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
- initialPositionInStream, checkpointInterval, storageLevel,
- defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
- }
-
- /**
- * Create an input stream that pulls messages from a Kinesis stream.
- * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
- *
- * Note:
- * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
- * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
- * gets AWS credentials.
- * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
- * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
- * [[org.apache.spark.SparkConf]].
- *
- * @param jssc Java StreamingContext object
- * @param streamName Kinesis stream name
- * @param endpointUrl Endpoint url of Kinesis service
- * (e.g., https://kinesis.us-east-1.amazonaws.com)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
- * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
- * worker's initial starting position in the stream.
- * The values are either the beginning of the stream
- * per Kinesis' limit of 24 hours
- * (InitialPositionInStream.TRIM_HORIZON) or
- * the tip of the stream (InitialPositionInStream.LATEST).
- * @param storageLevel Storage level to use for storing the received objects
- * StorageLevel.MEMORY_AND_DISK_2 is recommended.
- */
- @deprecated("use other forms of createStream", "1.4.0")
- def createStream(
- jssc: JavaStreamingContext,
- streamName: String,
- endpointUrl: String,
- checkpointInterval: Duration,
- initialPositionInStream: InitialPositionInStream,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Array[Byte]] = {
- createStream(
- jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
- }
-
- private def getRegionByEndpoint(endpointUrl: String): String = {
- RegionUtils.getRegionByEndpoint(endpointUrl).getName()
- }
-
- private def validateRegion(regionName: String): String = {
- Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
- throw new IllegalArgumentException(s"Region name '$regionName' is not valid")
- }
- }
-
- private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
- if (record == null) return null
- val byteBuffer = record.getData()
- val byteArray = new Array[Byte](byteBuffer.remaining())
- byteBuffer.get(byteArray)
- byteArray
- }
-}
-
-/**
- * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's KinesisUtils.
- */
-private class KinesisUtilsPythonHelper {
-
- def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
- initialPositionInStream match {
- case 0 => InitialPositionInStream.LATEST
- case 1 => InitialPositionInStream.TRIM_HORIZON
- case _ => throw new IllegalArgumentException(
- "Illegal InitialPositionInStream. Please use " +
- "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
- }
- }
-
- def createStream(
- jssc: JavaStreamingContext,
- kinesisAppName: String,
- streamName: String,
- endpointUrl: String,
- regionName: String,
- initialPositionInStream: Int,
- checkpointInterval: Duration,
- storageLevel: StorageLevel,
- awsAccessKeyId: String,
- awsSecretKey: String
- ): JavaReceiverInputDStream[Array[Byte]] = {
- if (awsAccessKeyId == null && awsSecretKey != null) {
- throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
- }
- if (awsAccessKeyId != null && awsSecretKey == null) {
- throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
- }
- if (awsAccessKeyId == null && awsSecretKey == null) {
- KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
- getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
- } else {
- KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
- getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
- awsAccessKeyId, awsSecretKey)
- }
- }
-
-}
diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
deleted file mode 100644
index 5c2371c543..0000000000
--- a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis;
-
-import com.amazonaws.services.kinesis.model.Record;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-/**
- * Demonstrate the use of the KinesisUtils Java API
- */
-public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testKinesisStream() {
- // Tests the API, does not actually test data receiving
- JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
-
- ssc.stop();
- }
-
-
- private static Function<Record, String> handler = new Function<Record, String>() {
- @Override
- public String call(Record record) {
- return record.getPartitionKey() + "-" + record.getSequenceNumber();
- }
- };
-
- @Test
- public void testCustomHandler() {
- // Tests the API, does not actually test data receiving
- JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream",
- "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST,
- new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class);
-
- ssc.stop();
- }
-}
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties
deleted file mode 100644
index edbecdae92..0000000000
--- a/extras/kinesis-asl/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
deleted file mode 100644
index fdb270eaad..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult}
-import com.google.common.util.concurrent.{FutureCallback, Futures}
-
-private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
- override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
- if (!aggregate) {
- new SimpleDataGenerator(kinesisClient)
- } else {
- new KPLDataGenerator(regionName)
- }
- }
-}
-
-/** A wrapper for the KinesisProducer provided in the KPL. */
-private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator {
-
- private lazy val producer: KPLProducer = {
- val conf = new KinesisProducerConfiguration()
- .setRecordMaxBufferedTime(1000)
- .setMaxConnections(1)
- .setRegion(regionName)
- .setMetricsLevel("none")
-
- new KPLProducer(conf)
- }
-
- override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
- val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
- data.foreach { num =>
- val str = num.toString
- val data = ByteBuffer.wrap(str.getBytes())
- val future = producer.addUserRecord(streamName, str, data)
- val kinesisCallBack = new FutureCallback[UserRecordResult]() {
- override def onFailure(t: Throwable): Unit = {} // do nothing
-
- override def onSuccess(result: UserRecordResult): Unit = {
- val shardId = result.getShardId
- val seqNumber = result.getSequenceNumber()
- val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
- new ArrayBuffer[(Int, String)]())
- sentSeqNumbers += ((num, seqNumber))
- }
- }
- Futures.addCallback(future, kinesisCallBack)
- }
- producer.flushSync()
- shardIdToSeqNumbers.toMap
- }
-}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
deleted file mode 100644
index 2555332d22..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
-import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
-
-abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
- extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {
-
- private val testData = 1 to 8
-
- private var testUtils: KinesisTestUtils = null
- private var shardIds: Seq[String] = null
- private var shardIdToData: Map[String, Seq[Int]] = null
- private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
- private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null
- private var shardIdToRange: Map[String, SequenceNumberRange] = null
- private var allRanges: Seq[SequenceNumberRange] = null
-
- private var blockManager: BlockManager = null
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- runIfTestsEnabled("Prepare KinesisTestUtils") {
- testUtils = new KPLBasedKinesisTestUtils()
- testUtils.createStream()
-
- shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
- require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
-
- shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
- shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
- shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
- shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
- val seqNumRange = SequenceNumberRange(
- testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
- (shardId, seqNumRange)
- }
- allRanges = shardIdToRange.values.toSeq
- }
- }
-
- override def beforeEach(): Unit = {
- super.beforeEach()
- val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
- sc = new SparkContext(conf)
- blockManager = sc.env.blockManager
- }
-
- override def afterAll(): Unit = {
- try {
- if (testUtils != null) {
- testUtils.deleteStream()
- }
- } finally {
- super.afterAll()
- }
- }
-
- testIfEnabled("Basic reading from Kinesis") {
- // Verify all data using multiple ranges in a single RDD partition
- val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
- testUtils.endpointUrl, fakeBlockIds(1),
- Array(SequenceNumberRanges(allRanges.toArray))
- ).map { bytes => new String(bytes).toInt }.collect()
- assert(receivedData1.toSet === testData.toSet)
-
- // Verify all data using one range in each of the multiple RDD partitions
- val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
- testUtils.endpointUrl, fakeBlockIds(allRanges.size),
- allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
- ).map { bytes => new String(bytes).toInt }.collect()
- assert(receivedData2.toSet === testData.toSet)
-
- // Verify ordering within each partition
- val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
- testUtils.endpointUrl, fakeBlockIds(allRanges.size),
- allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
- ).map { bytes => new String(bytes).toInt }.collectPartitions()
- assert(receivedData3.length === allRanges.size)
- for (i <- 0 until allRanges.size) {
- assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
- }
- }
-
- testIfEnabled("Read data available in both block manager and Kinesis") {
- testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2)
- }
-
- testIfEnabled("Read data available only in block manager, not in Kinesis") {
- testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0)
- }
-
- testIfEnabled("Read data available only in Kinesis, not in block manager") {
- testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2)
- }
-
- testIfEnabled("Read data available partially in block manager, rest in Kinesis") {
- testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1)
- }
-
- testIfEnabled("Test isBlockValid skips block fetching from block manager") {
- testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0,
- testIsBlockValid = true)
- }
-
- testIfEnabled("Test whether RDD is valid after removing blocks from block anager") {
- testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2,
- testBlockRemove = true)
- }
-
- /**
- * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
- * and the rest to a write ahead log, and then reading reading it all back using the RDD.
- * It can also test if the partitions that were read from the log were again stored in
- * block manager.
- *
- *
- *
- * @param numPartitions Number of partitions in RDD
- * @param numPartitionsInBM Number of partitions to write to the BlockManager.
- * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
- * @param numPartitionsInKinesis Number of partitions to write to the Kinesis.
- * Partitions (numPartitions - 1 - numPartitionsInKinesis) to
- * (numPartitions - 1) will be written to Kinesis
- * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
- * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
- * reads falling back to the WAL
- * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
- *
- * numPartitionsInBM = 3
- * |------------------|
- * | |
- * 0 1 2 3 4
- * | |
- * |-------------------------|
- * numPartitionsInKinesis = 4
- */
- private def testRDD(
- numPartitions: Int,
- numPartitionsInBM: Int,
- numPartitionsInKinesis: Int,
- testIsBlockValid: Boolean = false,
- testBlockRemove: Boolean = false
- ): Unit = {
- require(shardIds.size > 1, "Need at least 2 shards to test")
- require(numPartitionsInBM <= shardIds.size,
- "Number of partitions in BlockManager cannot be more than the Kinesis test shards available")
- require(numPartitionsInKinesis <= shardIds.size,
- "Number of partitions in Kinesis cannot be more than the Kinesis test shards available")
- require(numPartitionsInBM <= numPartitions,
- "Number of partitions in BlockManager cannot be more than that in RDD")
- require(numPartitionsInKinesis <= numPartitions,
- "Number of partitions in Kinesis cannot be more than that in RDD")
-
- // Put necessary blocks in the block manager
- val blockIds = fakeBlockIds(numPartitions)
- blockIds.foreach(blockManager.removeBlock(_))
- (0 until numPartitionsInBM).foreach { i =>
- val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() }
- blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY)
- }
-
- // Create the necessary ranges to use in the RDD
- val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
- SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
- val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
- val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
- SequenceNumberRanges(Array(range))
- }
- val ranges = (fakeRanges ++ realRanges)
-
-
- // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
- require(
- blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
- "Expected blocks not in BlockManager"
- )
-
- require(
- blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
- "Unexpected blocks in BlockManager"
- )
-
- // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not
- require(
- ranges.takeRight(numPartitionsInKinesis).forall {
- _.ranges.forall { _.streamName == testUtils.streamName }
- }, "Incorrect configuration of RDD, expected ranges not set: "
- )
-
- require(
- ranges.dropRight(numPartitionsInKinesis).forall {
- _.ranges.forall { _.streamName != testUtils.streamName }
- }, "Incorrect configuration of RDD, unexpected ranges set"
- )
-
- val rdd = new KinesisBackedBlockRDD[Array[Byte]](
- sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
- val collectedData = rdd.map { bytes =>
- new String(bytes).toInt
- }.collect()
- assert(collectedData.toSet === testData.toSet)
-
- // Verify that the block fetching is skipped when isBlockValid is set to false.
- // This is done by using a RDD whose data is only in memory but is set to skip block fetching
- // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
- // in BlockManager.
- if (testIsBlockValid) {
- require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
- require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
- val rdd2 = new KinesisBackedBlockRDD[Array[Byte]](
- sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
- isBlockIdValid = Array.fill(blockIds.length)(false))
- intercept[SparkException] {
- rdd2.collect()
- }
- }
-
- // Verify that the RDD is not invalid after the blocks are removed and can still read data
- // from write ahead log
- if (testBlockRemove) {
- require(numPartitions === numPartitionsInKinesis,
- "All partitions must be in WAL for this test")
- require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
- rdd.removeBlocks()
- assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet)
- }
- }
-
- /** Generate fake block ids */
- private def fakeBlockIds(num: Int): Array[BlockId] = {
- Array.tabulate(num) { i => new StreamBlockId(0, i) }
- }
-}
-
-class WithAggregationKinesisBackedBlockRDDSuite
- extends KinesisBackedBlockRDDTests(aggregateTestData = true)
-
-class WithoutAggregationKinesisBackedBlockRDDSuite
- extends KinesisBackedBlockRDDTests(aggregateTestData = false)
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
deleted file mode 100644
index e1499a8220..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import java.util.concurrent.{ExecutorService, TimeoutException}
-
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
-import org.scalatest.concurrent.Eventually
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.streaming.{Duration, TestSuiteBase}
-import org.apache.spark.util.ManualClock
-
-class KinesisCheckpointerSuite extends TestSuiteBase
- with MockitoSugar
- with BeforeAndAfterEach
- with PrivateMethodTester
- with Eventually {
-
- private val workerId = "dummyWorkerId"
- private val shardId = "dummyShardId"
- private val seqNum = "123"
- private val otherSeqNum = "245"
- private val checkpointInterval = Duration(10)
- private val someSeqNum = Some(seqNum)
- private val someOtherSeqNum = Some(otherSeqNum)
-
- private var receiverMock: KinesisReceiver[Array[Byte]] = _
- private var checkpointerMock: IRecordProcessorCheckpointer = _
- private var kinesisCheckpointer: KinesisCheckpointer = _
- private var clock: ManualClock = _
-
- private val checkpoint = PrivateMethod[Unit]('checkpoint)
-
- override def beforeEach(): Unit = {
- receiverMock = mock[KinesisReceiver[Array[Byte]]]
- checkpointerMock = mock[IRecordProcessorCheckpointer]
- clock = new ManualClock()
- kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock)
- }
-
- test("checkpoint is not called twice for the same sequence number") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
- kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
- kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-
- verify(checkpointerMock, times(1)).checkpoint(anyString())
- }
-
- test("checkpoint is called after sequence number increases") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
- .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
- kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
- kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
-
- verify(checkpointerMock, times(1)).checkpoint(seqNum)
- verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
- }
-
- test("should checkpoint if we have exceeded the checkpoint interval") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
- .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
-
- kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
- clock.advance(5 * checkpointInterval.milliseconds)
-
- eventually(timeout(1 second)) {
- verify(checkpointerMock, times(1)).checkpoint(seqNum)
- verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
- }
- }
-
- test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
- kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
- clock.advance(checkpointInterval.milliseconds / 2)
-
- verify(checkpointerMock, never()).checkpoint(anyString())
- }
-
- test("should not checkpoint for the same sequence number") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
- kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
-
- clock.advance(checkpointInterval.milliseconds * 5)
- eventually(timeout(1 second)) {
- verify(checkpointerMock, atMost(1)).checkpoint(anyString())
- }
- }
-
- test("removing checkpointer checkpoints one last time") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
- kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock)
- verify(checkpointerMock, times(1)).checkpoint(anyString())
- }
-
- test("if checkpointing is going on, wait until finished before removing and checkpointing") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
- .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
- when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
- override def answer(invocations: InvocationOnMock): Unit = {
- clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
- }
- })
-
- kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
- clock.advance(checkpointInterval.milliseconds)
- eventually(timeout(1 second)) {
- verify(checkpointerMock, times(1)).checkpoint(anyString())
- }
- // don't block test thread
- val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock))(
- ExecutionContext.global)
-
- intercept[TimeoutException] {
- Await.ready(f, 50 millis)
- }
-
- clock.advance(checkpointInterval.milliseconds / 2)
- eventually(timeout(1 second)) {
- verify(checkpointerMock, times(2)).checkpoint(anyString())
- }
- }
-}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
deleted file mode 100644
index ee428f31d6..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * Helper class that runs Kinesis real data transfer tests or
- * ignores them based on env variable is set or not.
- */
-trait KinesisFunSuite extends SparkFunSuite {
- import KinesisTestUtils._
-
- /** Run the test if environment variable is set or ignore the test */
- def testIfEnabled(testName: String)(testBody: => Unit) {
- if (shouldRunTests) {
- test(testName)(testBody)
- } else {
- ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
- }
- }
-
- /** Run the give body of code only if Kinesis tests are enabled */
- def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
- if (shouldRunTests) {
- body
- } else {
- ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
- }
- }
-}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
deleted file mode 100644
index fd15b6ccdc..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.kinesis
-
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.Arrays
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions._
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
-import com.amazonaws.services.kinesis.model.Record
-import org.mockito.Matchers._
-import org.mockito.Matchers.{eq => meq}
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.streaming.{Duration, TestSuiteBase}
-import org.apache.spark.util.Utils
-
-/**
- * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
- */
-class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
- with MockitoSugar {
-
- val app = "TestKinesisReceiver"
- val stream = "mySparkStream"
- val endpoint = "endpoint-url"
- val workerId = "dummyWorkerId"
- val shardId = "dummyShardId"
- val seqNum = "dummySeqNum"
- val checkpointInterval = Duration(10)
- val someSeqNum = Some(seqNum)
-
- val record1 = new Record()
- record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8)))
- val record2 = new Record()
- record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8)))
- val batch = Arrays.asList(record1, record2)
-
- var receiverMock: KinesisReceiver[Array[Byte]] = _
- var checkpointerMock: IRecordProcessorCheckpointer = _
-
- override def beforeFunction(): Unit = {
- receiverMock = mock[KinesisReceiver[Array[Byte]]]
- checkpointerMock = mock[IRecordProcessorCheckpointer]
- }
-
- test("check serializability of SerializableAWSCredentials") {
- Utils.deserialize[SerializableAWSCredentials](
- Utils.serialize(new SerializableAWSCredentials("x", "y")))
- }
-
- test("process records including store and set checkpointer") {
- when(receiverMock.isStopped()).thenReturn(false)
-
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.processRecords(batch, checkpointerMock)
-
- verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, times(1)).addRecords(shardId, batch)
- verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
- }
-
- test("shouldn't store and update checkpointer when receiver is stopped") {
- when(receiverMock.isStopped()).thenReturn(true)
-
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.processRecords(batch, checkpointerMock)
-
- verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
- verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
- }
-
- test("shouldn't update checkpointer when exception occurs during store") {
- when(receiverMock.isStopped()).thenReturn(false)
- when(
- receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
- ).thenThrow(new RuntimeException())
-
- intercept[RuntimeException] {
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.processRecords(batch, checkpointerMock)
- }
-
- verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, times(1)).addRecords(shardId, batch)
- verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
- }
-
- test("shutdown should checkpoint if the reason is TERMINATE") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
-
- verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), meq(checkpointerMock))
- }
-
-
- test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
- when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
-
- val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
- recordProcessor.initialize(shardId)
- recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
- recordProcessor.shutdown(checkpointerMock, null)
-
- verify(receiverMock, times(2)).removeCheckpointer(meq(shardId),
- meq[IRecordProcessorCheckpointer](null))
- }
-
- test("retry success on first attempt") {
- val expectedIsStopped = false
- when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
-
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
-
- verify(receiverMock, times(1)).isStopped()
- }
-
- test("retry success on second attempt after a Kinesis throttling exception") {
- val expectedIsStopped = false
- when(receiverMock.isStopped())
- .thenThrow(new ThrottlingException("error message"))
- .thenReturn(expectedIsStopped)
-
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
-
- verify(receiverMock, times(2)).isStopped()
- }
-
- test("retry success on second attempt after a Kinesis dependency exception") {
- val expectedIsStopped = false
- when(receiverMock.isStopped())
- .thenThrow(new KinesisClientLibDependencyException("error message"))
- .thenReturn(expectedIsStopped)
-
- val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
- assert(actualVal == expectedIsStopped)
-
- verify(receiverMock, times(2)).isStopped()
- }
-
- test("retry failed after a shutdown exception") {
- when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))
-
- intercept[ShutdownException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
-
- verify(checkpointerMock, times(1)).checkpoint()
- }
-
- test("retry failed after an invalid state exception") {
- when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))
-
- intercept[InvalidStateException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
-
- verify(checkpointerMock, times(1)).checkpoint()
- }
-
- test("retry failed after unexpected exception") {
- when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))
-
- intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
-
- verify(checkpointerMock, times(1)).checkpoint()
- }
-
- test("retry failed after exhausing all retries") {
- val expectedErrorMessage = "final try error message"
- when(checkpointerMock.checkpoint())
- .thenThrow(new ThrottlingException("error message"))
- .thenThrow(new ThrottlingException(expectedErrorMessage))
-
- val exception = intercept[RuntimeException] {
- KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
- }
- exception.getMessage().shouldBe(expectedErrorMessage)
-
- verify(checkpointerMock, times(2)).checkpoint()
- }
-}
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
deleted file mode 100644
index ca5d13da46..0000000000
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kinesis
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import com.amazonaws.regions.RegionUtils
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import com.amazonaws.services.kinesis.model.Record
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.Matchers._
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.kinesis.KinesisTestUtils._
-import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
-import org.apache.spark.util.Utils
-
-abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
- with Eventually with BeforeAndAfter with BeforeAndAfterAll {
-
- // This is the name that KCL will use to save metadata to DynamoDB
- private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
- private val batchDuration = Seconds(1)
-
- // Dummy parameters for API testing
- private val dummyEndpointUrl = defaultEndpointUrl
- private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
- private val dummyAWSAccessKey = "dummyAccessKey"
- private val dummyAWSSecretKey = "dummySecretKey"
-
- private var testUtils: KinesisTestUtils = null
- private var ssc: StreamingContext = null
- private var sc: SparkContext = null
-
- override def beforeAll(): Unit = {
- val conf = new SparkConf()
- .setMaster("local[4]")
- .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
- sc = new SparkContext(conf)
-
- runIfTestsEnabled("Prepare KinesisTestUtils") {
- testUtils = new KPLBasedKinesisTestUtils()
- testUtils.createStream()
- }
- }
-
- override def afterAll(): Unit = {
- if (ssc != null) {
- ssc.stop()
- }
- if (sc != null) {
- sc.stop()
- }
- if (testUtils != null) {
- // Delete the Kinesis stream as well as the DynamoDB table generated by
- // Kinesis Client Library when consuming the stream
- testUtils.deleteStream()
- testUtils.deleteDynamoDBTable(appName)
- }
- }
-
- before {
- ssc = new StreamingContext(sc, batchDuration)
- }
-
- after {
- if (ssc != null) {
- ssc.stop(stopSparkContext = false)
- ssc = null
- }
- if (testUtils != null) {
- testUtils.deleteDynamoDBTable(appName)
- }
- }
-
- test("KinesisUtils API") {
- // Tests the API, does not actually test data receiving
- val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
- dummyEndpointUrl, Seconds(2),
- InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- dummyEndpointUrl, dummyRegionName,
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
- val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
- dummyEndpointUrl, dummyRegionName,
- InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
- dummyAWSAccessKey, dummyAWSSecretKey)
- }
-
- test("RDD generation") {
- val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
- dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
- StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
- assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
-
- val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
- val time = Time(1000)
-
- // Generate block info data for testing
- val seqNumRanges1 = SequenceNumberRanges(
- SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
- val blockId1 = StreamBlockId(kinesisStream.id, 123)
- val blockInfo1 = ReceivedBlockInfo(
- 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
-
- val seqNumRanges2 = SequenceNumberRanges(
- SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
- val blockId2 = StreamBlockId(kinesisStream.id, 345)
- val blockInfo2 = ReceivedBlockInfo(
- 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
-
- // Verify that the generated KinesisBackedBlockRDD has the all the right information
- val blockInfos = Seq(blockInfo1, blockInfo2)
- val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
- nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
- val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
- assert(kinesisRDD.regionName === dummyRegionName)
- assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
- assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
- assert(kinesisRDD.awsCredentialsOption ===
- Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
- assert(nonEmptyRDD.partitions.size === blockInfos.size)
- nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
- val partitions = nonEmptyRDD.partitions.map {
- _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
- assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
- assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
- assert(partitions.forall { _.isBlockIdValid === true })
-
- // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
- val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
- emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
- emptyRDD.partitions shouldBe empty
-
- // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
- blockInfos.foreach { _.setBlockIdInvalid() }
- kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
- assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
- }
- }
-
-
- /**
- * Test the stream by sending data to a Kinesis stream and receiving from it.
- * This test is not run by default as it requires AWS credentials that the test
- * environment may not have. Even if there is AWS credentials available, the user
- * may not want to run these tests to avoid the Kinesis costs. To enable this test,
- * you must have AWS credentials available through the default AWS provider chain,
- * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
- */
- testIfEnabled("basic operation") {
- val awsCredentials = KinesisTestUtils.getAWSCredentials()
- val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
- testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
- Seconds(10), StorageLevel.MEMORY_ONLY,
- awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
- stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
- }
- ssc.start()
-
- val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
- testUtils.pushData(testData, aggregateTestData)
- assert(collected === testData.toSet, "\nData received does not match data sent")
- }
- ssc.stop(stopSparkContext = false)
- }
-
- testIfEnabled("custom message handling") {
- val awsCredentials = KinesisTestUtils.getAWSCredentials()
- def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
- val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
- testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
- Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
- awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
- stream shouldBe a [ReceiverInputDStream[_]]
-
- val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
- stream.foreachRDD { rdd =>
- collected ++= rdd.collect()
- logInfo("Collected = " + collected.mkString(", "))
- }
- ssc.start()
-
- val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
- testUtils.pushData(testData, aggregateTestData)
- val modData = testData.map(_ + 5)
- assert(collected === modData.toSet, "\nData received does not match data sent")
- }
- ssc.stop(stopSparkContext = false)
- }
-
- testIfEnabled("failure recovery") {
- val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
- val checkpointDir = Utils.createTempDir().getAbsolutePath
-
- ssc = new StreamingContext(sc, Milliseconds(1000))
- ssc.checkpoint(checkpointDir)
-
- val awsCredentials = KinesisTestUtils.getAWSCredentials()
- val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
-
- val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
- testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
- Seconds(10), StorageLevel.MEMORY_ONLY,
- awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
-
- // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
- kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
- val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
- val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
- collectedData.synchronized {
- collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
- }
- })
-
- ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
- ssc.start()
-
- def numBatchesWithData: Int =
- collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
-
- def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
-
- // Run until there are at least 10 batches with some data in them
- // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
- // function failed with exceptions, and nothing got added to `collectedData`
- eventually(timeout(2 minutes), interval(1 seconds)) {
- testUtils.pushData(1 to 5, aggregateTestData)
- assert(isCheckpointPresent && numBatchesWithData > 10)
- }
- ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
-
- // Restart the context from checkpoint and verify whether the
- logInfo("Restarting from checkpoint")
- ssc = new StreamingContext(checkpointDir)
- ssc.start()
- val recoveredKinesisStream = ssc.graph.getInputStreams().head
-
- // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
- // and return the same data
- collectedData.synchronized {
- val times = collectedData.keySet
- times.foreach { time =>
- val (arrayOfSeqNumRanges, data) = collectedData(time)
- val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
- rdd shouldBe a[KinesisBackedBlockRDD[_]]
-
- // Verify the recovered sequence ranges
- val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
- assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
- arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
- assert(expected.ranges.toSeq === found.ranges.toSeq)
- }
-
- // Verify the recovered data
- assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
- }
- }
- ssc.stop()
- }
-}
-
-class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
-
-class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
deleted file mode 100644
index bfb92791de..0000000000
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <!-- Ganglia integration is not included by default due to LGPL-licensed code -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-ganglia-lgpl_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Ganglia Integration</name>
-
- <properties>
- <sbt.project.name>ganglia-lgpl</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-ganglia</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
deleted file mode 100644
index 3b1880e143..0000000000
--- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.metrics.sink
-
-import java.util.Properties
-import java.util.concurrent.TimeUnit
-
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.ganglia.GangliaReporter
-import info.ganglia.gmetric4j.gmetric.GMetric
-import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
-
-import org.apache.spark.SecurityManager
-import org.apache.spark.metrics.MetricsSystem
-
-class GangliaSink(val property: Properties, val registry: MetricRegistry,
- securityMgr: SecurityManager) extends Sink {
- val GANGLIA_KEY_PERIOD = "period"
- val GANGLIA_DEFAULT_PERIOD = 10
-
- val GANGLIA_KEY_UNIT = "unit"
- val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
-
- val GANGLIA_KEY_MODE = "mode"
- val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
-
- // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
- val GANGLIA_KEY_TTL = "ttl"
- val GANGLIA_DEFAULT_TTL = 1
-
- val GANGLIA_KEY_HOST = "host"
- val GANGLIA_KEY_PORT = "port"
-
- def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
-
- if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
- throw new Exception("Ganglia sink requires 'host' property.")
- }
-
- if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
- throw new Exception("Ganglia sink requires 'port' property.")
- }
-
- val host = propertyToOption(GANGLIA_KEY_HOST).get
- val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
- val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
- val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
- .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
- val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
- .getOrElse(GANGLIA_DEFAULT_PERIOD)
- val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
- .map(u => TimeUnit.valueOf(u.toUpperCase))
- .getOrElse(GANGLIA_DEFAULT_UNIT)
-
- MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
-
- val ganglia = new GMetric(host, port, mode, ttl)
- val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .convertRatesTo(TimeUnit.SECONDS)
- .build(ganglia)
-
- override def start() {
- reporter.start(pollPeriod, pollUnit)
- }
-
- override def stop() {
- reporter.stop()
- }
-
- override def report() {
- reporter.report()
- }
-}
-