aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:09:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:09:48 -0700
commit3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch)
tree5f362cf13352f4a06ea05e2f3221674147587e75 /python/pyspark/streaming/tests.py
parent39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff)
downloadspark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing <zsxwing@gmail.com> Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py86
1 files changed, 85 insertions, 1 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4ecae1e4bf..5cd544b214 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -36,9 +36,11 @@ else:
import unittest
from pyspark.context import SparkConf, SparkContext, RDD
+from pyspark.storagelevel import StorageLevel
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
from pyspark.streaming.flume import FlumeUtils
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
class PySparkStreamingTestCase(unittest.TestCase):
@@ -891,6 +893,67 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
self._testMultipleTimes(self._testFlumePollingMultipleHosts)
+class KinesisStreamTests(PySparkStreamingTestCase):
+
+ def test_kinesis_stream_api(self):
+ # Don't start the StreamingContext because we cannot test it in Jenkins
+ kinesisStream1 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2)
+ kinesisStream2 = KinesisUtils.createStream(
+ self.ssc, "myAppNam", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2",
+ InitialPositionInStream.LATEST, 2, StorageLevel.MEMORY_AND_DISK_2,
+ "awsAccessKey", "awsSecretKey")
+
+ def test_kinesis_stream(self):
+ if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
+ print("Skip test_kinesis_stream")
+ return
+
+ import random
+ kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
+ kinesisTestUtilsClz = \
+ self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
+ .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
+ kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+ try:
+ kinesisTestUtils.createStream()
+ aWSCredentials = kinesisTestUtils.getAWSCredentials()
+ stream = KinesisUtils.createStream(
+ self.ssc, kinesisAppName, kinesisTestUtils.streamName(),
+ kinesisTestUtils.endpointUrl(), kinesisTestUtils.regionName(),
+ InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_ONLY,
+ aWSCredentials.getAWSAccessKeyId(), aWSCredentials.getAWSSecretKey())
+
+ outputBuffer = []
+
+ def get_output(_, rdd):
+ for e in rdd.collect():
+ outputBuffer.append(e)
+
+ stream.foreachRDD(get_output)
+ self.ssc.start()
+
+ testData = [i for i in range(1, 11)]
+ expectedOutput = set([str(i) for i in testData])
+ start_time = time.time()
+ while time.time() - start_time < 120:
+ kinesisTestUtils.pushData(testData)
+ if expectedOutput == set(outputBuffer):
+ break
+ time.sleep(10)
+ self.assertEqual(expectedOutput, set(outputBuffer))
+ except:
+ import traceback
+ traceback.print_exc()
+ raise
+ finally:
+ kinesisTestUtils.deleteStream()
+ kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
@@ -926,10 +989,31 @@ def search_flume_assembly_jar():
else:
return jars[0]
+
+def search_kinesis_asl_assembly_jar():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
+ jars = glob.glob(
+ os.path.join(kinesis_asl_assembly_dir,
+ "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
+ kinesis_asl_assembly_dir) + "You need to build Spark with "
+ "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
+ "or 'build/mvn -Pkinesis-asl package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
+ "remove all but one") % kinesis_asl_assembly_dir)
+ else:
+ return jars[0]
+
+
if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
- jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+ kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
+ jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()