aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdev/run-tests.py3
-rw-r--r--dev/sparktestsupport/modules.py9
-rw-r--r--docs/streaming-kinesis-integration.md19
-rw-r--r--extras/kinesis-asl-assembly/pom.xml103
-rw-r--r--extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py81
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala19
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala78
-rw-r--r--pom.xml1
-rw-r--r--project/SparkBuild.scala6
-rw-r--r--python/pyspark/streaming/kinesis.py112
-rw-r--r--python/pyspark/streaming/tests.py86
11 files changed, 492 insertions, 25 deletions
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 29420da9aa..b6d181418f 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -301,7 +301,8 @@ def build_spark_sbt(hadoop_version):
sbt_goals = ["package",
"assembly/assembly",
"streaming-kafka-assembly/assembly",
- "streaming-flume-assembly/assembly"]
+ "streaming-flume-assembly/assembly",
+ "streaming-kinesis-asl-assembly/assembly"]
profiles_and_goals = build_profiles + sbt_goals
print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 44600cb952..956dc81b62 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -138,6 +138,7 @@ streaming_kinesis_asl = Module(
dependencies=[],
source_file_regexes=[
"extras/kinesis-asl/",
+ "extras/kinesis-asl-assembly/",
],
build_profile_flags=[
"-Pkinesis-asl",
@@ -300,7 +301,13 @@ pyspark_sql = Module(
pyspark_streaming = Module(
name="pyspark-streaming",
- dependencies=[pyspark_core, streaming, streaming_kafka, streaming_flume_assembly],
+ dependencies=[
+ pyspark_core,
+ streaming,
+ streaming_kafka,
+ streaming_flume_assembly,
+ streaming_kinesis_asl
+ ],
source_file_regexes=[
"python/pyspark/streaming"
],
diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
index aa9749afbc..a7bcaec6fc 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -52,6 +52,17 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
</div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+ kinesisStream = KinesisUtils.createStream(
+ streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
+ [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
+
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the next subsection for instructions to run the example.
+
+ </div>
</div>
- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
@@ -136,6 +147,14 @@ To run the example,
bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
</div>
+ <div data-lang="python" markdown="1">
+
+ bin/spark-submit --jars extras/kinesis-asl/target/scala-*/\
+ spark-streaming-kinesis-asl-assembly_*.jar \
+ extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
+
+ </div>
</div>
This will wait for data to be received from the Kinesis stream.
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
new file mode 100644
index 0000000000..70d2c9c58f
--- /dev/null
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl-assembly_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Kinesis Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+</project>
+
diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
new file mode 100644
index 0000000000..f428f64da3
--- /dev/null
+++ b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Consumes messages from a Amazon Kinesis streams and does wordcount.
+
+ This example spins up 1 Kinesis Receiver per shard for the given stream.
+ It then starts pulling from the last checkpointed sequence number of the given stream.
+
+ Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
+ <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ <endpoint-url> endpoint of the Kinesis service
+ (e.g. https://kinesis.us-east-1.amazonaws.com)
+
+
+ Example:
+ # export AWS keys if necessary
+ $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ $ export AWS_SECRET_KEY=<your-secret-key>
+
+ # run the example
+ $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
+ spark-streaming-kinesis-asl-assembly_*.jar \
+ extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+ myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
+
+ There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ onto the Kinesis stream.
+
+ This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ in the following order:
+ Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ Java System Properties - aws.accessKeyId and aws.secretKey
+ Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ Instance profile credentials - delivered through the Amazon EC2 metadata service
+ For more information, see
+ http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+
+ See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ the Kinesis Spark Streaming integration.
+"""
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+if __name__ == "__main__":
+ if len(sys.argv) != 5:
+ print(
+ "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
+ file=sys.stderr)
+ sys.exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
+ ssc = StreamingContext(sc, 1)
+ appName, streamName, endpointUrl, regionName = sys.argv[1:]
+ lines = KinesisUtils.createStream(
+ ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
+ counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a+b)
+ counts.pprint()
+
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index ca39358b75..255ac27f79 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,9 +36,15 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
-private class KinesisTestUtils(
- val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
- _regionName: String = "") extends Logging {
+private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
+
+ def this() {
+ this("https://kinesis.us-west-2.amazonaws.com", "")
+ }
+
+ def this(endpointUrl: String) {
+ this(endpointUrl, "")
+ }
val regionName = if (_regionName.length == 0) {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
@@ -117,6 +123,13 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}
+ /**
+ * Expose a Python friendly API.
+ */
+ def pushData(testData: java.util.List[Int]): Unit = {
+ pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
+ }
+
def deleteStream(): Unit = {
try {
if (streamCreated) {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index e5acab5018..7dab17eba8 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -86,19 +86,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
ssc: StreamingContext,
@@ -130,7 +130,7 @@ object KinesisUtils {
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
- * @param ssc Java StreamingContext object
+ * @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -175,15 +175,15 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@@ -206,8 +206,8 @@ object KinesisUtils {
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the Kinesis Client Library
@@ -216,19 +216,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
jssc: JavaStreamingContext,
@@ -297,3 +297,49 @@ object KinesisUtils {
}
}
}
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+ def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
+ initialPositionInStream match {
+ case 0 => InitialPositionInStream.LATEST
+ case 1 => InitialPositionInStream.TRIM_HORIZON
+ case _ => throw new IllegalArgumentException(
+ "Illegal InitialPositionInStream. Please use " +
+ "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
+ }
+ }
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: Int,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ if (awsAccessKeyId == null && awsSecretKey != null) {
+ throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
+ }
+ if (awsAccessKeyId != null && awsSecretKey == null) {
+ throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
+ }
+ if (awsAccessKeyId == null && awsSecretKey == null) {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
+ } else {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
+ awsAccessKeyId, awsSecretKey)
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 35fc8c44bc..e351c7c19d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1642,6 +1642,7 @@
<id>kinesis-asl</id>
<modules>
<module>extras/kinesis-asl</module>
+ <module>extras/kinesis-asl-assembly</module>
</modules>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 61a05d375d..9a33baa7c6 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) =
- Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly")
+ val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
+ Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
.map(ProjectRef(buildLocation, _))
val tools = ProjectRef(buildLocation, "tools")
@@ -382,7 +382,7 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
new file mode 100644
index 0000000000..bcfe2703fe
--- /dev/null
+++ b/python/pyspark/streaming/kinesis.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.serializers import PairDeserializer, NoOpSerializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming import DStream
+
+__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ return s and s.decode('utf-8')
+
+
+class KinesisUtils(object):
+
+ @staticmethod
+ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+ awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
+ """
+ Create an input stream that pulls messages from a Kinesis stream. This uses the
+ Kinesis Client Library (KCL) to pull messages from Kinesis.
+
+ Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is
+ enabled. Make sure that your checkpoint directory is secure.
+
+ :param ssc: StreamingContext object
+ :param kinesisAppName: Kinesis application name used by the Kinesis Client Library (KCL) to
+ update DynamoDB
+ :param streamName: Kinesis stream name
+ :param endpointUrl: Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ :param regionName: Name of region used by the Kinesis Client Library (KCL) to update
+ DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ :param initialPositionInStream: In the absence of Kinesis checkpoint info, this is the
+ worker's initial starting position in the stream. The
+ values are either the beginning of the stream per Kinesis'
+ limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
+ the tip of the stream (InitialPositionInStream.LATEST).
+ :param checkpointInterval: Checkpoint interval for Kinesis checkpointing. See the Kinesis
+ Spark Streaming documentation for more details on the different
+ types of checkpoints.
+ :param storageLevel: Storage level to use for storing the received objects (default is
+ StorageLevel.MEMORY_AND_DISK_2)
+ :param awsAccessKeyId: AWS AccessKeyId (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param awsSecretKey: AWS SecretKey (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param decoder: A function used to decode value (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ jduration = ssc._jduration(checkpointInterval)
+
+ try:
+ # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
+ regionName, initialPositionInStream, jduration, jlevel,
+ awsAccessKeyId, awsSecretKey)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KinesisUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ stream = DStream(jstream, ssc, NoOpSerializer())
+ return stream.map(lambda v: decoder(v))
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
+
+ 1. Include the Kinesis library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
+
+
+class InitialPositionInStream(object):
+ LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4ecae1e4bf..5cd544b214 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -36,9 +36,11 @@ else:
import unittest
from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.streaming.flume import FlumeUtils
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
class PySparkStreamingTestCase(unittest.TestCase):
@@ -891,6 +893,67 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+class KinesisStreamTests(PySparkStreamingTestCase):
+
+ def test_kinesis_stream_api(self):
+ # Don't start the StreamingContext because we cannot test it in Jenkins
+ kinesisStream1 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
+ kinesisStream2 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+
+ def test_kinesis_stream(self):
+ if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
+ print("Skip test_kinesis_stream")
+ return
+
+ import random
+ kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
+ kinesisTestUtilsClz = \
+ self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
+ kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+ try:
+ kinesisTestUtils.createStream()
+ aWSCredentials = kinesisTestUtils.getAWSCredentials()
+ stream = KinesisUtils.createStream(
+ self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
+ kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
+ InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
+
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ stream.foreachRDD(get_output)
+ self.ssc.start()
+
+ testData = [i for i in range(1, 11)]
+ expectedOutput = set([str(i) for i in testData])
+ start_time = time.time()
+ while time.time() - start_time < 120:
+ kinesisTestUtils.pushData(testData)
+ if expectedOutput == set(outputBuffer):
+ break
+ time.sleep(10)
+ self.assertEqual(expectedOutput, set(outputBuffer))
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ finally:
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
@@ -926,10 +989,31 @@ def search_flume_assembly_jar():
else:
return jars[0]
+
+def search_kinesis_asl_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
+ jars = glob.glob(
+ os.path.join(kinesis_asl_assembly_dir,
+ "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
+ kinesis_asl_assembly_dir) + "You need to build Spark with "
+ "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
+ "or 'build/mvn -Pkinesis-asl package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
+ "remove all but one") % kinesis_asl_assembly_dir)
+ else:
+ return jars[0]
+
+
if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
- jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+ kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+ jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()