aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:09:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:09:48 -0700
commit3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch)
tree5f362cf13352f4a06ea05e2f3221674147587e75 /extras
parent39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff)
downloadspark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing <zsxwing@gmail.com> Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl-assembly/pom.xml103
-rw-r--r--extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py81
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala19
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala78
4 files changed, 262 insertions, 19 deletions
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
new file mode 100644
index 0000000000..70d2c9c58f
--- /dev/null
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl-assembly_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Kinesis Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+</project>
+
diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
new file mode 100644
index 0000000000..f428f64da3
--- /dev/null
+++ b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Consumes messages from a Amazon Kinesis streams and does wordcount.
+
+ This example spins up 1 Kinesis Receiver per shard for the given stream.
+ It then starts pulling from the last checkpointed sequence number of the given stream.
+
+ Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
+ <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ <endpoint-url> endpoint of the Kinesis service
+ (e.g. https://kinesis.us-east-1.amazonaws.com)
+
+
+ Example:
+ # export AWS keys if necessary
+ $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ $ export AWS_SECRET_KEY=<your-secret-key>
+
+ # run the example
+ $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\
+ spark-streaming-kinesis-asl-assembly_*.jar \
+ extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+ myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
+
+ There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ onto the Kinesis stream.
+
+ This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ in the following order:
+ Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ Java System Properties - aws.accessKeyId and aws.secretKey
+ Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ Instance profile credentials - delivered through the Amazon EC2 metadata service
+ For more information, see
+ http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+
+ See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ the Kinesis Spark Streaming integration.
+"""
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+if __name__ == "__main__":
+ if len(sys.argv) != 5:
+ print(
+ "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
+ file=sys.stderr)
+ sys.exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
+ ssc = StreamingContext(sc, 1)
+ appName, streamName, endpointUrl, regionName = sys.argv[1:]
+ lines = KinesisUtils.createStream(
+ ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
+ counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a+b)
+ counts.pprint()
+
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index ca39358b75..255ac27f79 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,9 +36,15 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
-private class KinesisTestUtils(
- val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
- _regionName: String = "") extends Logging {
+private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
+
+ def this() {
+ this("https://kinesis.us-west-2.amazonaws.com", "")
+ }
+
+ def this(endpointUrl: String) {
+ this(endpointUrl, "")
+ }
val regionName = if (_regionName.length == 0) {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
@@ -117,6 +123,13 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}
+ /**
+ * Expose a Python friendly API.
+ */
+ def pushData(testData: java.util.List[Int]): Unit = {
+ pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
+ }
+
def deleteStream(): Unit = {
try {
if (streamCreated) {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index e5acab5018..7dab17eba8 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -86,19 +86,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
ssc: StreamingContext,
@@ -130,7 +130,7 @@ object KinesisUtils {
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
- * @param ssc Java StreamingContext object
+ * @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -175,15 +175,15 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@@ -206,8 +206,8 @@ object KinesisUtils {
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the Kinesis Client Library
@@ -216,19 +216,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
jssc: JavaStreamingContext,
@@ -297,3 +297,49 @@ object KinesisUtils {
}
}
}
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+ def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
+ initialPositionInStream match {
+ case 0 => InitialPositionInStream.LATEST
+ case 1 => InitialPositionInStream.TRIM_HORIZON
+ case _ => throw new IllegalArgumentException(
+ "Illegal InitialPositionInStream. Please use " +
+ "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
+ }
+ }
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: Int,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ if (awsAccessKeyId == null && awsSecretKey != null) {
+ throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
+ }
+ if (awsAccessKeyId != null && awsSecretKey == null) {
+ throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
+ }
+ if (awsAccessKeyId == null && awsSecretKey == null) {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
+ } else {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
+ awsAccessKeyId, awsSecretKey)
+ }
+ }
+
+}