diff options
author | zsxwing <zsxwing@gmail.com> | 2015-07-31 12:09:48 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-31 12:09:48 -0700 |
commit | 3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch) | |
tree | 5f362cf13352f4a06ea05e2f3221674147587e75 /python/pyspark/streaming/tests.py | |
parent | 39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff) | |
download | spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2 spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip |
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test.
Author: zsxwing <zsxwing@gmail.com>
Closes #6955 from zsxwing/kinesis-python and squashes the following commits:
e42e471 [zsxwing] Merge branch 'master' into kinesis-python
455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module
32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python
5082d28 [zsxwing] Fix the syntax error for Python 2.6
fca416b [zsxwing] Fix wrong comparison
96670ff [zsxwing] Fix the compilation error after merging master
756a128 [zsxwing] Merge branch 'master' into kinesis-python
6c37395 [zsxwing] Print stack trace for debug
7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS
cc9d071 [zsxwing] Fix the python test errors
466b425 [zsxwing] Add python tests for Kinesis
e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python
3da2601 [zsxwing] Fix the kinesis folder
687446b [zsxwing] Fix the error message and the maven output path
add2beb [zsxwing] Merge branch 'master' into kinesis-python
4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r-- | python/pyspark/streaming/tests.py | 86 |
1 files changed, 85 insertions, 1 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 4ecae1e4bf..5cd544b214 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -36,9 +36,11 @@ else: import unittest from pyspark.context import SparkConf, SparkContext, RDD +from pyspark.storagelevel import StorageLevel from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition from pyspark.streaming.flume import FlumeUtils +from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream class PySparkStreamingTestCase(unittest.TestCase): @@ -891,6 +893,67 @@ class FlumePollingStreamTests(PySparkStreamingTestCase): self._testMultipleTimes(self._testFlumePollingMultipleHosts) +class KinesisStreamTests(PySparkStreamingTestCase): + + def test_kinesis_stream_api(self): + # Don't start the StreamingContext because we cannot test it in Jenkins + kinesisStream1 = KinesisUtils.createStream( + self.ssc, "myAppNam", "mySparkStream", + "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2) + kinesisStream2 = KinesisUtils.createStream( + self.ssc, "myAppNam", "mySparkStream", + "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2, + "awsAccessKey", "awsSecretKey") + + def test_kinesis_stream(self): + if os.environ.get('ENABLE_KINESIS_TESTS') != '1': + print("Skip test_kinesis_stream") + return + + import random + kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000))) + kinesisTestUtilsClz = \ + self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils") + kinesisTestUtils = kinesisTestUtilsClz.newInstance() + try: + kinesisTestUtils.createStream() + aWSCredentials = kinesisTestUtils.getAWSCredentials() + stream = KinesisUtils.createStream( + self.ssc, kinesisAppName, kinesisTestUtils.streamName(), + kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(), + InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY, + aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey()) + + outputBuffer = [] + + def get_output(_, rdd): + for e in rdd.collect(): + outputBuffer.append(e) + + stream.foreachRDD(get_output) + self.ssc.start() + + testData = [i for i in range(1, 11)] + expectedOutput = set([str(i) for i in testData]) + start_time = time.time() + while time.time() - start_time < 120: + kinesisTestUtils.pushData(testData) + if expectedOutput == set(outputBuffer): + break + time.sleep(10) + self.assertEqual(expectedOutput, set(outputBuffer)) + except: + import traceback + traceback.print_exc() + raise + finally: + kinesisTestUtils.deleteStream() + kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) + + def search_kafka_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly") @@ -926,10 +989,31 @@ def search_flume_assembly_jar(): else: return jars[0] + +def search_kinesis_asl_assembly_jar(): + SPARK_HOME = os.environ["SPARK_HOME"] + kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly") + jars = glob.glob( + os.path.join(kinesis_asl_assembly_dir, + "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar")) + if not jars: + raise Exception( + ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " % + kinesis_asl_assembly_dir) + "You need to build Spark with " + "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' " + "or 'build/mvn -Pkinesis-asl package' before running this test") + elif len(jars) > 1: + raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please " + "remove all but one") % kinesis_asl_assembly_dir) + else: + return jars[0] + + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar() - jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar) + kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() + jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars unittest.main() |