aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/kinesis.py
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:09:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:09:48 -0700
commit3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch)
tree5f362cf13352f4a06ea05e2f3221674147587e75 /python/pyspark/streaming/kinesis.py
parent39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff)
downloadspark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing <zsxwing@gmail.com> Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'python/pyspark/streaming/kinesis.py')
-rw-r--r--python/pyspark/streaming/kinesis.py112
1 files changed, 112 insertions, 0 deletions
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
new file mode 100644
index 0000000000..bcfe2703fe
--- /dev/null
+++ b/python/pyspark/streaming/kinesis.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.serializers import PairDeserializer, NoOpSerializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming import DStream
+
+__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ return s and s.decode('utf-8')
+
+
+class KinesisUtils(object):
+
+ @staticmethod
+ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+ awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
+ """
+ Create an input stream that pulls messages from a Kinesis stream. This uses the
+ Kinesis Client Library (KCL) to pull messages from Kinesis.
+
+ Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is
+ enabled. Make sure that your checkpoint directory is secure.
+
+ :param ssc: StreamingContext object
+ :param kinesisAppName: Kinesis application name used by the Kinesis Client Library (KCL) to
+ update DynamoDB
+ :param streamName: Kinesis stream name
+ :param endpointUrl: Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ :param regionName: Name of region used by the Kinesis Client Library (KCL) to update
+ DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ :param initialPositionInStream: In the absence of Kinesis checkpoint info, this is the
+ worker's initial starting position in the stream. The
+ values are either the beginning of the stream per Kinesis'
+ limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
+ the tip of the stream (InitialPositionInStream.LATEST).
+ :param checkpointInterval: Checkpoint interval for Kinesis checkpointing. See the Kinesis
+ Spark Streaming documentation for more details on the different
+ types of checkpoints.
+ :param storageLevel: Storage level to use for storing the received objects (default is
+ StorageLevel.MEMORY_AND_DISK_2)
+ :param awsAccessKeyId: AWS AccessKeyId (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param awsSecretKey: AWS SecretKey (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param decoder: A function used to decode value (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ jduration = ssc._jduration(checkpointInterval)
+
+ try:
+ # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
+ regionName, initialPositionInStream, jduration, jlevel,
+ awsAccessKeyId, awsSecretKey)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KinesisUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ stream = DStream(jstream, ssc, NoOpSerializer())
+ return stream.map(lambda v: decoder(v))
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
+
+ 1. Include the Kinesis library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
+
+
+class InitialPositionInStream(object):
+ LATEST, TRIM_HORIZON = (0, 1)