aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-10 23:41:53 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-10 23:41:53 -0700
commit0f90d6055e5bea9ceb1d454db84f4aa1d59b284d (patch)
treed3f5e1b50313eb76559f3a31be01b045bf2976e9 /python
parent91e9389f39509e63654bd4bcb7bd919eaedda910 (diff)
downloadspark-0f90d6055e5bea9ceb1d454db84f4aa1d59b284d.tar.gz
spark-0f90d6055e5bea9ceb1d454db84f4aa1d59b284d.tar.bz2
spark-0f90d6055e5bea9ceb1d454db84f4aa1d59b284d.zip
[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the Kinesis assembly JAR has not been generated
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7961 from tdas/SPARK-9640 and squashes the following commits: 974ce19 [Tathagata Das] Undo changes related to SPARK-9727 004ae26 [Tathagata Das] style fixes 9bbb97d [Tathagata Das] Minor style fies e6a677e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9640 ca90719 [Tathagata Das] Removed extra line ba9cfc7 [Tathagata Das] Improved kinesis test selection logic 88d59bd [Tathagata Das] updated test modules 871fcc8 [Tathagata Das] Fixed SparkBuild 94be631 [Tathagata Das] Fixed style b858196 [Tathagata Das] Fixed conditions and few other things based on PR comments. e292e64 [Tathagata Das] Added filters for Kinesis python tests
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/tests.py56
1 files changed, 44 insertions, 12 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 66ae3345f4..f0ed415f97 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -971,8 +971,10 @@ class KinesisStreamTests(PySparkStreamingTestCase):
"awsAccessKey", "awsSecretKey")
def test_kinesis_stream(self):
- if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
- print("Skip test_kinesis_stream")
+ if not are_kinesis_tests_enabled:
+ sys.stderr.write(
+ "Skipped test_kinesis_stream (enable by setting environment variable %s=1"
+ % kinesis_test_environ_var)
return
import random
@@ -1013,6 +1015,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
traceback.print_exc()
raise
finally:
+ self.ssc.stop(False)
kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
@@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
- "'build/mvn package' before running this test")
+ "'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir)
@@ -1045,7 +1048,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
- "'build/mvn package' before running this test")
+ "'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
"remove all but one") % flume_assembly_dir)
@@ -1095,11 +1098,7 @@ def search_kinesis_asl_assembly_jar():
os.path.join(kinesis_asl_assembly_dir,
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
if not jars:
- raise Exception(
- ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
- kinesis_asl_assembly_dir) + "You need to build Spark with "
- "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
- "or 'build/mvn -Pkinesis-asl package' before running this test")
+ return None
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
"remove all but one") % kinesis_asl_assembly_dir)
@@ -1107,6 +1106,10 @@ def search_kinesis_asl_assembly_jar():
return jars[0]
+# Must be same as the variable and condition defined in KinesisTestUtils.scala
+kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
+are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
+
if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
@@ -1114,8 +1117,37 @@ if __name__ == "__main__":
mqtt_test_jar = search_mqtt_test_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
- jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar,
- mqtt_assembly_jar, mqtt_test_jar)
+ if kinesis_asl_assembly_jar is None:
+ kinesis_jar_present = False
+ jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
+ mqtt_test_jar)
+ else:
+ kinesis_jar_present = True
+ jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
+ mqtt_test_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
- unittest.main()
+ testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
+ CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
+
+ if kinesis_jar_present is True:
+ testcases.append(KinesisStreamTests)
+ elif are_kinesis_tests_enabled is False:
+ sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was "
+ "not compiled with -Pkinesis-asl profile. To run these tests, "
+ "you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly "
+ "streaming-kinesis-asl-assembly/assembly' or "
+ "'build/mvn -Pkinesis-asl package' before running this test.")
+ else:
+ raise Exception(
+ ("Failed to find Spark Streaming Kinesis assembly jar in %s. "
+ % kinesis_asl_assembly_dir) +
+ "You need to build Spark with 'build/sbt -Pkinesis-asl "
+ "assembly/assembly streaming-kinesis-asl-assembly/assembly'"
+ "or 'build/mvn -Pkinesis-asl package' before running this test.")
+
+ sys.stderr.write("Running tests: %s \n" % (str(testcases)))
+ for testcase in testcases:
+ sys.stderr.write("[Running %s]\n" % (testcase))
+ tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
+ unittest.TextTestRunner(verbosity=2).run(tests)