aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:09:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:09:48 -0700
commit3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch)
tree5f362cf13352f4a06ea05e2f3221674147587e75 /python
parent39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff)
downloadspark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing <zsxwing@gmail.com> Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/kinesis.py112
-rw-r--r--python/pyspark/streaming/tests.py86
2 files changed, 197 insertions, 1 deletions
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
new file mode 100644
index 0000000000..bcfe2703fe
--- /dev/null
+++ b/python/pyspark/streaming/kinesis.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from py4j.java_gateway import Py4JJavaError
+
+from pyspark.serializers import PairDeserializer, NoOpSerializer
+from pyspark.storagelevel import StorageLevel
+from pyspark.streaming import DStream
+
+__all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
+
+
+def utf8_decoder(s):
+ """ Decode the unicode as UTF-8 """
+ return s and s.decode('utf-8')
+
+
+class KinesisUtils(object):
+
+ @staticmethod
+ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
+ awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder):
+ """
+ Create an input stream that pulls messages from a Kinesis stream. This uses the
+ Kinesis Client Library (KCL) to pull messages from Kinesis.
+
+ Note: The given AWS credentials will get saved in DStream checkpoints if checkpointing is
+ enabled. Make sure that your checkpoint directory is secure.
+
+ :param ssc: StreamingContext object
+ :param kinesisAppName: Kinesis application name used by the Kinesis Client Library (KCL) to
+ update DynamoDB
+ :param streamName: Kinesis stream name
+ :param endpointUrl: Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ :param regionName: Name of region used by the Kinesis Client Library (KCL) to update
+ DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ :param initialPositionInStream: In the absence of Kinesis checkpoint info, this is the
+ worker's initial starting position in the stream. The
+ values are either the beginning of the stream per Kinesis'
+ limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
+ the tip of the stream (InitialPositionInStream.LATEST).
+ :param checkpointInterval: Checkpoint interval for Kinesis checkpointing. See the Kinesis
+ Spark Streaming documentation for more details on the different
+ types of checkpoints.
+ :param storageLevel: Storage level to use for storing the received objects (default is
+ StorageLevel.MEMORY_AND_DISK_2)
+ :param awsAccessKeyId: AWS AccessKeyId (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param awsSecretKey: AWS SecretKey (default is None. If None, will use
+ DefaultAWSCredentialsProviderChain)
+ :param decoder: A function used to decode value (default is utf8_decoder)
+ :return: A DStream object
+ """
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ jduration = ssc._jduration(checkpointInterval)
+
+ try:
+ # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
+ helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
+ helper = helperClass.newInstance()
+ jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
+ regionName, initialPositionInStream, jduration, jlevel,
+ awsAccessKeyId, awsSecretKey)
+ except Py4JJavaError as e:
+ if 'ClassNotFoundException' in str(e.java_exception):
+ KinesisUtils._printErrorMsg(ssc.sparkContext)
+ raise e
+ stream = DStream(jstream, ssc, NoOpSerializer())
+ return stream.map(lambda v: decoder(v))
+
+ @staticmethod
+ def _printErrorMsg(sc):
+ print("""
+________________________________________________________________________________________________
+
+ Spark Streaming's Kinesis libraries not found in class path. Try one of the following.
+
+ 1. Include the Kinesis library and its dependencies with in the
+ spark-submit command as
+
+ $ bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl:%s ...
+
+ 2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
+ Group Id = org.apache.spark, Artifact Id = spark-streaming-kinesis-asl-assembly, Version = %s.
+ Then, include the jar in the spark-submit command as
+
+ $ bin/spark-submit --jars <spark-streaming-kinesis-asl-assembly.jar> ...
+
+________________________________________________________________________________________________
+
+""" % (sc.version, sc.version))
+
+
+class InitialPositionInStream(object):
+ LATEST, TRIM_HORIZON = (0, 1)
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4ecae1e4bf..5cd544b214 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -36,9 +36,11 @@ else:
import unittest
from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.streaming.flume import FlumeUtils
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
class PySparkStreamingTestCase(unittest.TestCase):
@@ -891,6 +893,67 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+class KinesisStreamTests(PySparkStreamingTestCase):
+
+ def test_kinesis_stream_api(self):
+ # Don't start the StreamingContext because we cannot test it in Jenkins
+ kinesisStream1 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
+ kinesisStream2 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+
+ def test_kinesis_stream(self):
+ if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
+ print("Skip test_kinesis_stream")
+ return
+
+ import random
+ kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
+ kinesisTestUtilsClz = \
+ self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
+ kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+ try:
+ kinesisTestUtils.createStream()
+ aWSCredentials = kinesisTestUtils.getAWSCredentials()
+ stream = KinesisUtils.createStream(
+ self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
+ kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
+ InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
+
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ stream.foreachRDD(get_output)
+ self.ssc.start()
+
+ testData = [i for i in range(1, 11)]
+ expectedOutput = set([str(i) for i in testData])
+ start_time = time.time()
+ while time.time() - start_time < 120:
+ kinesisTestUtils.pushData(testData)
+ if expectedOutput == set(outputBuffer):
+ break
+ time.sleep(10)
+ self.assertEqual(expectedOutput, set(outputBuffer))
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ finally:
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
@@ -926,10 +989,31 @@ def search_flume_assembly_jar():
else:
return jars[0]
+
+def search_kinesis_asl_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
+ jars = glob.glob(
+ os.path.join(kinesis_asl_assembly_dir,
+ "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
+ kinesis_asl_assembly_dir) + "You need to build Spark with "
+ "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
+ "or 'build/mvn -Pkinesis-asl package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
+ "remove all but one") % kinesis_asl_assembly_dir)
+ else:
+ return jars[0]
+
+
if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
- jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+ kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+ jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()