aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl
diff options
context:
space:
mode:
authorChris Fregly <chris@fregly.com>2014-08-02 13:35:35 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-08-02 13:35:35 -0700
commit91f9504e6086fac05b40545099f9818949c24bca (patch)
treec79c63f0b3f82c4c9b632072f384b85bc7f646f1 /extras/kinesis-asl
parent67bd8e3c217a80c3117a6e3853aa60fe13d08c91 (diff)
downloadspark-91f9504e6086fac05b40545099f9818949c24bca.tar.gz
spark-91f9504e6086fac05b40545099f9818949c24bca.tar.bz2
spark-91f9504e6086fac05b40545099f9818949c24bca.zip
[SPARK-1981] Add AWS Kinesis streaming support
Author: Chris Fregly <chris@fregly.com> Closes #1434 from cfregly/master and squashes the following commits: 4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method 0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into extras/kinesis-asl 691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams 0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master' 74e5c7c [Chris Fregly] updated per TD's feedback. simplified examples, updated docs e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master' bf614e9 [Chris Fregly] per matei's feedback: moved the kinesis examples into the examples/ dir d17ca6d [Chris Fregly] per TD's feedback: updated docs, simplified the KinesisUtils api 912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail class db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master' 21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master' 6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and kinesis client 338997e [Chris Fregly] improve build docs for kinesis 828f8ae [Chris Fregly] more cleanup e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master' cd68c0d [Chris Fregly] fixed typos and backward compatibility d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master' b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support
Diffstat (limited to 'extras/kinesis-asl')
-rw-r--r--extras/kinesis-asl/pom.xml96
-rw-r--r--extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java180
-rw-r--r--extras/kinesis-asl/src/main/resources/log4j.properties37
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala251
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala56
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala149
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala212
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala96
-rw-r--r--extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java41
-rw-r--r--extras/kinesis-asl/src/test/resources/log4j.properties26
-rw-r--r--extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala275
11 files changed, 1419 insertions, 0 deletions
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
new file mode 100644
index 0000000000..a54b34235d
--- /dev/null
+++ b/extras/kinesis-asl/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Kinesis Integration</name>
+
+ <properties>
+ <sbt.project.name>kinesis-asl</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-client</artifactId>
+ <version>${aws.kinesis.client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>${aws.java.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
new file mode 100644
index 0000000000..a8b907b241
--- /dev/null
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
+ * for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given
+ * <stream-name> and <endpoint-url>.
+ *
+ * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
+ * in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ *
+ * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
+ * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ * <endpoint-url> is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ * $ $SPARK_HOME/bin/run-example \
+ * org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducerASL which puts dummy data
+ * onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in the class definition.
+ */
+public final class JavaKinesisWordCountASL {
+ private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+ private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
+
+ /* Make the constructor private to enforce singleton */
+ private JavaKinesisWordCountASL() {
+ }
+
+ public static void main(String[] args) {
+ /* Check that all required args were passed in. */
+ if (args.length < 2) {
+ System.err.println(
+ "|Usage: KinesisWordCount <stream-name> <endpoint-url>\n" +
+ "| <stream-name> is the name of the Kinesis stream\n" +
+ "| <endpoint-url> is the endpoint of the Kinesis service\n" +
+ "| (e.g. https://kinesis.us-east-1.amazonaws.com)\n");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ /* Populate the appropriate variables from the given args */
+ String streamName = args[0];
+ String endpointUrl = args[1];
+ /* Set the batch interval to a fixed 2000 millis (2 seconds) */
+ Duration batchInterval = new Duration(2000);
+
+ /* Create a Kinesis client in order to determine the number of shards for the given stream */
+ AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
+ new DefaultAWSCredentialsProviderChain());
+ kinesisClient.setEndpoint(endpointUrl);
+
+ /* Determine the number of shards from the stream */
+ int numShards = kinesisClient.describeStream(streamName)
+ .getStreamDescription().getShards().size();
+
+ /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard */
+ int numStreams = numShards;
+
+ /* Must add 1 more thread than the number of receivers or the output won't show properly from the driver */
+ int numSparkThreads = numStreams + 1;
+
+ /* Setup the Spark config. */
+ SparkConf sparkConfig = new SparkConf().setAppName("KinesisWordCount").setMaster(
+ "local[" + numSparkThreads + "]");
+
+ /* Kinesis checkpoint interval. Same as batchInterval for this example. */
+ Duration checkpointInterval = batchInterval;
+
+ /* Setup the StreamingContext */
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+
+ /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+ List<JavaDStream<byte[]>> streamsList = new ArrayList<JavaDStream<byte[]>>(numStreams);
+ for (int i = 0; i < numStreams; i++) {
+ streamsList.add(
+ KinesisUtils.createStream(jssc, streamName, endpointUrl, checkpointInterval,
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2())
+ );
+ }
+
+ /* Union all the streams if there is more than 1 stream */
+ JavaDStream<byte[]> unionStreams;
+ if (streamsList.size() > 1) {
+ unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+ } else {
+ /* Otherwise, just use the 1 stream */
+ unionStreams = streamsList.get(0);
+ }
+
+ /*
+ * Split each line of the union'd DStreams into multiple words using flatMap to produce the collection.
+ * Convert lines of byte[] to multiple Strings by first converting to String, then splitting on WORD_SEPARATOR.
+ */
+ JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
+ @Override
+ public Iterable<String> call(byte[] line) {
+ return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+ }
+ });
+
+ /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. */
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ /* Print the first 10 wordCounts */
+ wordCounts.print();
+
+ /* Start the streaming context and await termination */
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..97348fb5b6
--- /dev/null
+++ b/extras/kinesis-asl/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO \ No newline at end of file
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
new file mode 100644
index 0000000000..d03edf8b30
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import scala.util.Random
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more details on
+ * the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
+ * for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given
+ * <stream-name> and <endpoint-url>.
+ *
+ * Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for credentials
+ * in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ *
+ * Usage: KinesisWordCountASL <stream-name> <endpoint-url>
+ * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ * <endpoint-url> is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ * $ $SPARK_HOME/bin/run-example \
+ * org.apache.spark.examples.streaming.KinesisWordCountASL mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducerASL which puts
+ * dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in that class definition.
+ */
+object KinesisWordCountASL extends Logging {
+ def main(args: Array[String]) {
+ /* Check that all required args were passed in. */
+ if (args.length < 2) {
+ System.err.println(
+ """
+ |Usage: KinesisWordCount <stream-name> <endpoint-url>
+ | <stream-name> is the name of the Kinesis stream
+ | <endpoint-url> is the endpoint of the Kinesis service
+ | (e.g. https://kinesis.us-east-1.amazonaws.com)
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ /* Populate the appropriate variables from the given args */
+ val Array(streamName, endpointUrl) = args
+
+ /* Determine the number of shards from the stream */
+ val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+ kinesisClient.setEndpoint(endpointUrl)
+ val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
+ .size()
+
+ /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */
+ val numStreams = numShards
+
+ /*
+ * numSparkThreads should be 1 more thread than the number of receivers.
+ * This leaves one thread available for actually processing the data.
+ */
+ val numSparkThreads = numStreams + 1
+
+ /* Setup the and SparkConfig and StreamingContext */
+ /* Spark Streaming batch interval */
+ val batchInterval = Milliseconds(2000)
+ val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
+ .setMaster(s"local[$numSparkThreads]")
+ val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+ /* Kinesis checkpoint interval. Same as batchInterval for this example. */
+ val kinesisCheckpointInterval = batchInterval
+
+ /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
+ val kinesisStreams = (0 until numStreams).map { i =>
+ KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ }
+
+ /* Union all the streams */
+ val unionStreams = ssc.union(kinesisStreams)
+
+ /* Convert each line of Array[Byte] to String, split into words, and count them */
+ val words = unionStreams.flatMap(byteArray => new String(byteArray)
+ .split(" "))
+
+ /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
+ val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+
+ /* Print the first 10 wordCounts */
+ wordCounts.print()
+
+ /* Start the streaming context and await termination */
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+/**
+ * Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url>
+ * <recordsPerSec> <wordsPerRecord>
+ * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ * <kinesis-endpoint-url> is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ * <records-per-sec> is the rate of records per second to put onto the stream
+ * <words-per-record> is the rate of records per second to put onto the stream
+ *
+ * Example:
+ * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ * $ $SPARK_HOME/bin/run-example \
+ * org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com 10 5
+ */
+object KinesisWordCountProducerASL {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KinesisWordCountProducerASL <stream-name> <endpoint-url>" +
+ " <records-per-sec> <words-per-record>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ /* Populate the appropriate variables from the given args */
+ val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
+
+ /* Generate the records and return the totals */
+ val totals = generate(stream, endpoint, recordsPerSecond.toInt, wordsPerRecord.toInt)
+
+ /* Print the array of (index, total) tuples */
+ println("Totals")
+ totals.foreach(total => println(total.toString()))
+ }
+
+ def generate(stream: String,
+ endpoint: String,
+ recordsPerSecond: Int,
+ wordsPerRecord: Int): Seq[(Int, Int)] = {
+
+ val MaxRandomInts = 10
+
+ /* Create the Kinesis client */
+ val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+ kinesisClient.setEndpoint(endpoint)
+
+ println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
+ s" $recordsPerSecond records per second and $wordsPerRecord words per record");
+
+ val totals = new Array[Int](MaxRandomInts)
+ /* Put String records onto the stream per the given recordPerSec and wordsPerRecord */
+ for (i <- 1 to 5) {
+
+ /* Generate recordsPerSec records to put onto the stream */
+ val records = (1 to recordsPerSecond.toInt).map { recordNum =>
+ /*
+ * Randomly generate each wordsPerRec words between 0 (inclusive)
+ * and MAX_RANDOM_INTS (exclusive)
+ */
+ val data = (1 to wordsPerRecord.toInt).map(x => {
+ /* Generate the random int */
+ val randomInt = Random.nextInt(MaxRandomInts)
+
+ /* Keep track of the totals */
+ totals(randomInt) += 1
+
+ randomInt.toString()
+ }).mkString(" ")
+
+ /* Create a partitionKey based on recordNum */
+ val partitionKey = s"partitionKey-$recordNum"
+
+ /* Create a PutRecordRequest with an Array[Byte] version of the data */
+ val putRecordRequest = new PutRecordRequest().withStreamName(stream)
+ .withPartitionKey(partitionKey)
+ .withData(ByteBuffer.wrap(data.getBytes()));
+
+ /* Put the record onto the stream and capture the PutRecordResult */
+ val putRecordResult = kinesisClient.putRecord(putRecordRequest);
+ }
+
+ /* Sleep for a second */
+ Thread.sleep(1000)
+ println("Sent " + recordsPerSecond + " records")
+ }
+
+ /* Convert the totals to (index, total) tuple */
+ (0 to (MaxRandomInts - 1)).zip(totals)
+ }
+}
+
+/**
+ * Utility functions for Spark Streaming examples.
+ * This has been lifted from the examples/ project to remove the circular dependency.
+ */
+object StreamingExamples extends Logging {
+
+ /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+ def setStreamingLogLevels() {
+ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ // We first log something to initialize Spark's default logging, then we override the
+ // logging level.
+ logInfo("Setting log level to [WARN] for streaming example." +
+ " To override add a custom log4j.properties to the classpath.")
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+ }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
new file mode 100644
index 0000000000..0b80b611cd
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * This is a helper class for managing checkpoint clocks.
+ *
+ * @param checkpointInterval
+ * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes)
+ */
+private[kinesis] class KinesisCheckpointState(
+ checkpointInterval: Duration,
+ currentClock: Clock = new SystemClock())
+ extends Logging {
+
+ /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */
+ val checkpointClock = new ManualClock()
+ checkpointClock.setTime(currentClock.currentTime() + checkpointInterval.milliseconds)
+
+ /**
+ * Check if it's time to checkpoint based on the current time and the derived time
+ * for the next checkpoint
+ *
+ * @return true if it's time to checkpoint
+ */
+ def shouldCheckpoint(): Boolean = {
+ new SystemClock().currentTime() > checkpointClock.currentTime()
+ }
+
+ /**
+ * Advance the checkpoint clock by the checkpoint interval.
+ */
+ def advanceCheckpoint() = {
+ checkpointClock.addToTime(checkpointInterval.milliseconds)
+ }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
new file mode 100644
index 0000000000..1bd1f32429
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.receiver.Receiver
+
+import com.amazonaws.auth.AWSCredentialsProvider
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with StreamingContext.receiverStream(Receiver)
+ * as described here:
+ * http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers
+ * to run within a Spark Executor.
+ *
+ * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
+ * by the Kinesis Client Library. If you change the App name or Stream name,
+ * the KCL will throw errors. This usually requires deleting the backing
+ * DynamoDB table with the same name this Kinesis application.
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @return ReceiverInputDStream[Array[Byte]]
+ */
+private[kinesis] class KinesisReceiver(
+ appName: String,
+ streamName: String,
+ endpointUrl: String,
+ checkpointInterval: Duration,
+ initialPositionInStream: InitialPositionInStream,
+ storageLevel: StorageLevel)
+ extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
+
+ /*
+ * The following vars are built in the onStart() method which executes in the Spark Worker after
+ * this code is serialized and shipped remotely.
+ */
+
+ /*
+ * workerId should be based on the ip address of the actual Spark Worker where this code runs
+ * (not the Driver's ip address.)
+ */
+ var workerId: String = null
+
+ /*
+ * This impl uses the DefaultAWSCredentialsProviderChain and searches for credentials
+ * in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file at the default location (~/.aws/credentials) shared by all
+ * AWS SDKs and the AWS CLI
+ * Instance profile credentials delivered through the Amazon EC2 metadata service
+ */
+ var credentialsProvider: AWSCredentialsProvider = null
+
+ /* KCL config instance. */
+ var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
+
+ /*
+ * RecordProcessorFactory creates impls of IRecordProcessor.
+ * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
+ * IRecordProcessor.processRecords() method.
+ * We're using our custom KinesisRecordProcessor in this case.
+ */
+ var recordProcessorFactory: IRecordProcessorFactory = null
+
+ /*
+ * Create a Kinesis Worker.
+ * This is the core client abstraction from the Kinesis Client Library (KCL).
+ * We pass the RecordProcessorFactory from above as well as the KCL config instance.
+ * A Kinesis Worker can process 1..* shards from the given stream - each with its
+ * own RecordProcessor.
+ */
+ var worker: Worker = null
+
+ /**
+ * This is called when the KinesisReceiver starts and must be non-blocking.
+ * The KCL creates and manages the receiving/processing thread pool through the Worker.run()
+ * method.
+ */
+ override def onStart() {
+ workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
+ credentialsProvider = new DefaultAWSCredentialsProviderChain()
+ kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
+ credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
+ .withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
+ recordProcessorFactory = new IRecordProcessorFactory {
+ override def createProcessor: IRecordProcessor = new KinesisRecordProcessor(receiver,
+ workerId, new KinesisCheckpointState(checkpointInterval))
+ }
+ worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
+ worker.run()
+ logInfo(s"Started receiver with workerId $workerId")
+ }
+
+ /**
+ * This is called when the KinesisReceiver stops.
+ * The KCL worker.shutdown() method stops the receiving/processing threads.
+ * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
+ */
+ override def onStop() {
+ worker.shutdown()
+ logInfo(s"Shut down receiver with workerId $workerId")
+ workerId = null
+ credentialsProvider = null
+ kinesisClientLibConfiguration = null
+ recordProcessorFactory = null
+ worker = null
+ }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
new file mode 100644
index 0000000000..8ecc2d9016
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon startup.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ * @param checkpointState represents the checkpoint state including the next checkpoint time.
+ * It's injected here for mocking purposes.
+ */
+private[kinesis] class KinesisRecordProcessor(
+ receiver: KinesisReceiver,
+ workerId: String,
+ checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
+
+ /* shardId to be populated during initialize() */
+ var shardId: String = _
+
+ /**
+ * The Kinesis Client Library calls this method during IRecordProcessor initialization.
+ *
+ * @param shardId assigned by the KCL to this particular RecordProcessor.
+ */
+ override def initialize(shardId: String) {
+ logInfo(s"Initialize: Initializing workerId $workerId with shardId $shardId")
+ this.shardId = shardId
+ }
+
+ /**
+ * This method is called by the KCL when a batch of records is pulled from the Kinesis stream.
+ * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords()
+ * and Spark Streaming's Receiver.store().
+ *
+ * @param batch list of records from the Kinesis stream shard
+ * @param checkpointer used to update Kinesis when this batch has been processed/stored
+ * in the DStream
+ */
+ override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
+ if (!receiver.isStopped()) {
+ try {
+ /*
+ * Note: If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
+ * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
+ * internally-configured Spark serializer (kryo, etc).
+ * This is not desirable, so we instead store a raw Array[Byte] and decouple
+ * ourselves from Spark's internal serialization strategy.
+ */
+ batch.foreach(record => receiver.store(record.getData().array()))
+
+ logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
+
+ /*
+ * Checkpoint the sequence number of the last record successfully processed/stored
+ * in the batch.
+ * In this implementation, we're checkpointing after the given checkpointIntervalMillis.
+ * Note that this logic requires that processRecords() be called AND that it's time to
+ * checkpoint. I point this out because there is no background thread running the
+ * checkpointer. Checkpointing is tested and trigger only when a new batch comes in.
+ * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
+ * However, if the worker dies unexpectedly, a checkpoint may not happen.
+ * This could lead to records being processed more than once.
+ */
+ if (checkpointState.shouldCheckpoint()) {
+ /* Perform the checkpoint */
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+ /* Update the next checkpoint time */
+ checkpointState.advanceCheckpoint()
+
+ logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
+ s" records for shardId $shardId")
+ logDebug(s"Checkpoint: Next checkpoint is at " +
+ s" ${checkpointState.checkpointClock.currentTime()} for shardId $shardId")
+ }
+ } catch {
+ case e: Throwable => {
+ /*
+ * If there is a failure within the batch, the batch will not be checkpointed.
+ * This will potentially cause records since the last checkpoint to be processed
+ * more than once.
+ */
+ logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
+ " or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
+
+ /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor.*/
+ throw e
+ }
+ }
+ } else {
+ /* RecordProcessor has been stopped. */
+ logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
+ s" and shardId $shardId. No more records will be processed.")
+ }
+ }
+
+ /**
+ * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
+ * 1) the stream is resharding by splitting or merging adjacent shards
+ * (ShutdownReason.TERMINATE)
+ * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
+ * (ShutdownReason.ZOMBIE)
+ *
+ * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
+ * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
+ */
+ override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
+ logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
+ reason match {
+ /*
+ * TERMINATE Use Case. Checkpoint.
+ * Checkpoint to indicate that all records from the shard have been drained and processed.
+ * It's now OK to read from the new shards that resulted from a resharding event.
+ */
+ case ShutdownReason.TERMINATE =>
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+ /*
+ * ZOMBIE Use Case. NoOp.
+ * No checkpoint because other workers may have taken over and already started processing
+ * the same records.
+ * This may lead to records being processed more than once.
+ */
+ case ShutdownReason.ZOMBIE =>
+
+ /* Unknown reason. NoOp */
+ case _ =>
+ }
+ }
+}
+
+private[kinesis] object KinesisRecordProcessor extends Logging {
+ /**
+ * Retry the given amount of times with a random backoff time (millis) less than the
+ * given maxBackOffMillis
+ *
+ * @param expression expression to evalute
+ * @param numRetriesLeft number of retries left
+ * @param maxBackOffMillis: max millis between retries
+ *
+ * @return evaluation of the given expression
+ * @throws Unretryable exception, unexpected exception,
+ * or any exception that persists after numRetriesLeft reaches 0
+ */
+ @annotation.tailrec
+ def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = {
+ util.Try { expression } match {
+ /* If the function succeeded, evaluate to x. */
+ case util.Success(x) => x
+ /* If the function failed, either retry or throw the exception */
+ case util.Failure(e) => e match {
+ /* Retry: Throttling or other Retryable exception has occurred */
+ case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
+ => {
+ val backOffMillis = Random.nextInt(maxBackOffMillis)
+ Thread.sleep(backOffMillis)
+ logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
+ retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
+ }
+ /* Throw: Shutdown has been requested by the Kinesis Client Library.*/
+ case _: ShutdownException => {
+ logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
+ throw e
+ }
+ /* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
+ case _: InvalidStateException => {
+ logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
+ s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
+ throw e
+ }
+ /* Throw: Unexpected exception has occurred */
+ case _ => {
+ logError(s"Unexpected, non-retryable exception.", e)
+ throw e
+ }
+ }
+ }
+ }
+}
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
new file mode 100644
index 0000000000..713cac0e29
--- /dev/null
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils {
+ /**
+ * Create an InputDStream that pulls messages from a Kinesis stream.
+ *
+ * @param ssc StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @return ReceiverInputDStream[Array[Byte]]
+ */
+ def createStream(
+ ssc: StreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ checkpointInterval: Duration,
+ initialPositionInStream: InitialPositionInStream,
+ storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl,
+ checkpointInterval, initialPositionInStream, storageLevel))
+ }
+
+ /**
+ * Create a Java-friendly InputDStream that pulls messages from a Kinesis stream.
+ *
+ * @param jssc Java StreamingContext object
+ * @param ssc StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @return JavaReceiverInputDStream[Array[Byte]]
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ checkpointInterval: Duration,
+ initialPositionInStream: InitialPositionInStream,
+ storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
+ jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName,
+ endpointUrl, checkpointInterval, initialPositionInStream, storageLevel))
+ }
+}
diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000000..87954a31f6
--- /dev/null
+++ b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testKinesisStream() {
+ // Tests the API, does not actually test data receiving
+ JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+
+ ssc.stop();
+ }
+}
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..e01e049595
--- /dev/null
+++ b/extras/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
new file mode 100644
index 0000000000..41dbd64c2b
--- /dev/null
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions.seqAsJavaList
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.Seconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.scalatest.BeforeAndAfter
+import org.scalatest.Matchers
+import org.scalatest.mock.EasyMockSugar
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
+ */
+class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
+ with EasyMockSugar {
+
+ val app = "TestKinesisReceiver"
+ val stream = "mySparkStream"
+ val endpoint = "endpoint-url"
+ val workerId = "dummyWorkerId"
+ val shardId = "dummyShardId"
+
+ val record1 = new Record()
+ record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
+ val record2 = new Record()
+ record2.setData(ByteBuffer.wrap("Learning Spark".getBytes()))
+ val batch = List[Record](record1, record2)
+
+ var receiverMock: KinesisReceiver = _
+ var checkpointerMock: IRecordProcessorCheckpointer = _
+ var checkpointClockMock: ManualClock = _
+ var checkpointStateMock: KinesisCheckpointState = _
+ var currentClockMock: Clock = _
+
+ override def beforeFunction() = {
+ receiverMock = mock[KinesisReceiver]
+ checkpointerMock = mock[IRecordProcessorCheckpointer]
+ checkpointClockMock = mock[ManualClock]
+ checkpointStateMock = mock[KinesisCheckpointState]
+ currentClockMock = mock[Clock]
+ }
+
+ test("kinesis utils api") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ // Tests the API, does not actually test data receiving
+ val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", Seconds(2),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+ ssc.stop()
+ }
+
+ test("process records including store and checkpoint") {
+ val expectedCheckpointIntervalMillis = 10
+ expecting {
+ receiverMock.isStopped().andReturn(false).once()
+ receiverMock.store(record1.getData().array()).once()
+ receiverMock.store(record2.getData().array()).once()
+ checkpointStateMock.shouldCheckpoint().andReturn(true).once()
+ checkpointerMock.checkpoint().once()
+ checkpointStateMock.advanceCheckpoint().once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+
+ test("shouldn't store and checkpoint when receiver is stopped") {
+ expecting {
+ receiverMock.isStopped().andReturn(true).once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+
+ test("shouldn't checkpoint when exception occurs during store") {
+ expecting {
+ receiverMock.isStopped().andReturn(false).once()
+ receiverMock.store(record1.getData().array()).andThrow(new RuntimeException()).once()
+ }
+ whenExecuting(receiverMock, checkpointerMock, checkpointStateMock) {
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+ }
+ }
+
+ test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointIntervalMillis = 10
+ val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ }
+ }
+
+ test("should checkpoint if we have exceeded the checkpoint interval") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MinValue), currentClockMock)
+ assert(checkpointState.shouldCheckpoint())
+ }
+ }
+
+ test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointState = new KinesisCheckpointState(Milliseconds(Long.MaxValue), currentClockMock)
+ assert(!checkpointState.shouldCheckpoint())
+ }
+ }
+
+ test("should add to time when advancing checkpoint") {
+ expecting {
+ currentClockMock.currentTime().andReturn(0).once()
+ }
+ whenExecuting(currentClockMock) {
+ val checkpointIntervalMillis = 10
+ val checkpointState = new KinesisCheckpointState(Milliseconds(checkpointIntervalMillis), currentClockMock)
+ assert(checkpointState.checkpointClock.currentTime() == checkpointIntervalMillis)
+ checkpointState.advanceCheckpoint()
+ assert(checkpointState.checkpointClock.currentTime() == (2 * checkpointIntervalMillis))
+ }
+ }
+
+ test("shutdown should checkpoint if the reason is TERMINATE") {
+ expecting {
+ checkpointerMock.checkpoint().once()
+ }
+ whenExecuting(checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ val reason = ShutdownReason.TERMINATE
+ recordProcessor.shutdown(checkpointerMock, reason)
+ }
+ }
+
+ test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+ expecting {
+ }
+ whenExecuting(checkpointerMock, checkpointStateMock) {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId,
+ checkpointStateMock)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+ }
+ }
+
+ test("retry success on first attempt") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry success on second attempt after a Kinesis throttling exception") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andThrow(new ThrottlingException("error message"))
+ .andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry success on second attempt after a Kinesis dependency exception") {
+ val expectedIsStopped = false
+ expecting {
+ receiverMock.isStopped().andThrow(new KinesisClientLibDependencyException("error message"))
+ .andReturn(expectedIsStopped).once()
+ }
+ whenExecuting(receiverMock) {
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+ }
+ }
+
+ test("retry failed after a shutdown exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new ShutdownException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[ShutdownException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after an invalid state exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new InvalidStateException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[InvalidStateException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after unexpected exception") {
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new RuntimeException("error message")).once()
+ }
+ whenExecuting(checkpointerMock) {
+ intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ }
+ }
+
+ test("retry failed after exhausing all retries") {
+ val expectedErrorMessage = "final try error message"
+ expecting {
+ checkpointerMock.checkpoint().andThrow(new ThrottlingException("error message"))
+ .andThrow(new ThrottlingException(expectedErrorMessage)).once()
+ }
+ whenExecuting(checkpointerMock) {
+ val exception = intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ exception.getMessage().shouldBe(expectedErrorMessage)
+ }
+ }
+}