From 3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 31 Jul 2015 12:09:48 -0700 Subject: [SPARK-8564] [STREAMING] Add the Python API for Kinesis This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis --- python/pyspark/streaming/kinesis.py | 112 ++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 python/pyspark/streaming/kinesis.py (limited to 'python/pyspark/streaming/kinesis.py') diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py new file mode 100644 index 0000000000..bcfe2703fe --- /dev/null +++ b/python/pyspark/streaming/kinesis.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_gateway import Py4JJavaError + +from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.storagelevel import StorageLevel +from pyspark.streaming import DStream + +__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder'] + + +def utf8_decoder(s): + """ Decode the unicode as UTF-8 """ + return s and s.decode('utf-8') + + +class KinesisUtils(object): + + @staticmethod + def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName, + initialPositionInStream, checkpointInterval, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, + awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder): + """ + Create an input stream that pulls messages from a Kinesis stream. This uses the + Kinesis Client Library (KCL) to pull messages from Kinesis. + + Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is + enabled. Make sure that your checkpoint directory is secure. + + :param ssc: StreamingContext object + :param kinesisAppName: Kinesis application name used by the Kinesis Client Library (KCL) to + update DynamoDB + :param streamName: Kinesis stream name + :param endpointUrl: Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) + :param regionName: Name of region used by the Kinesis Client Library (KCL) to update + DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) + :param initialPositionInStream: In the absence of Kinesis checkpoint info, this is the + worker's initial starting position in the stream. The + values are either the beginning of the stream per Kinesis' + limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or + the tip of the stream (InitialPositionInStream.LATEST). + :param checkpointInterval: Checkpoint interval for Kinesis checkpointing. See the Kinesis + Spark Streaming documentation for more details on the different + types of checkpoints. + :param storageLevel: Storage level to use for storing the received objects (default is + StorageLevel.MEMORY_AND_DISK_2) + :param awsAccessKeyId: AWS AccessKeyId (default is None. If None, will use + DefaultAWSCredentialsProviderChain) + :param awsSecretKey: AWS SecretKey (default is None. If None, will use + DefaultAWSCredentialsProviderChain) + :param decoder: A function used to decode value (default is utf8_decoder) + :return: A DStream object + """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + jduration = ssc._jduration(checkpointInterval) + + try: + # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ + .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, + regionName, initialPositionInStream, jduration, jlevel, + awsAccessKeyId, awsSecretKey) + except Py4JJavaError as e: + if 'ClassNotFoundException' in str(e.java_exception): + KinesisUtils._printErrorMsg(ssc.sparkContext) + raise e + stream = DStream(jstream, ssc, NoOpSerializer()) + return stream.map(lambda v: decoder(v)) + + @staticmethod + def _printErrorMsg(sc): + print(""" +________________________________________________________________________________________________ + + Spark Streaming's Kinesis libraries not found in class path. Try one of the following. + + 1. Include the Kinesis library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars ... + +________________________________________________________________________________________________ + +""" % (sc.version, sc.version)) + + +class InitialPositionInStream(object): + LATEST, TRIM_HORIZON = (0, 1) -- cgit v1.2.3