aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/tests.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 510a4f2b3e..cfea95b0de 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase):
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because
+# the artifact jars are in different directories.
+def search_jar(dir, name_prefix):
+ # We should ignore the following jars
+ ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
+ jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build
+ glob.glob(os.path.join(dir, "target/" + name_prefix + "_*.jar"))) # maven build
+ return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)]
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
- jars = glob.glob(
- os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar"))
+ jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
@@ -1174,8 +1183,8 @@ def search_kafka_assembly_jar():
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
- "remove all but one") % kafka_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1183,8 +1192,7 @@ def search_kafka_assembly_jar():
def search_flume_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
- jars = glob.glob(
- os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+ jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
@@ -1192,8 +1200,8 @@ def search_flume_assembly_jar():
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
- "remove all but one") % flume_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1201,8 +1209,7 @@ def search_flume_assembly_jar():
def search_mqtt_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
- jars = glob.glob(
- os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar"))
+ jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
@@ -1210,8 +1217,8 @@ def search_mqtt_assembly_jar():
"'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
"'build/mvn package' before running this test")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please "
- "remove all but one") % mqtt_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1227,8 +1234,8 @@ def search_mqtt_test_jar():
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please "
- "remove all but one") % mqtt_test_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1236,14 +1243,12 @@ def search_mqtt_test_jar():
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
- jars = glob.glob(
- os.path.join(kinesis_asl_assembly_dir,
- "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly")
if not jars:
return None
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
- "remove all but one") % kinesis_asl_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1269,8 +1274,8 @@ if __name__ == "__main__":
mqtt_test_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
- testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
- CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
+ testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)