aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-24 12:38:01 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-24 12:38:10 -0700
commit36bc50c8d377f3e628f7d608d58a76ea508e9697 (patch)
treefe37fbdd7ee03cdb270bf0d84abecc8e24f3e281
parentb40059dbda4dafbb883a53fbd5c5f69bc01a3e19 (diff)
downloadspark-36bc50c8d377f3e628f7d608d58a76ea508e9697.tar.gz
spark-36bc50c8d377f3e628f7d608d58a76ea508e9697.tar.bz2
spark-36bc50c8d377f3e628f7d608d58a76ea508e9697.zip
[SPARK-10168] [STREAMING] Fix the issue that maven publishes wrong artifact jars
This PR removed the `outputFile` configuration from pom.xml and updated `tests.py` to search jars for both sbt build and maven build. I ran ` mvn -Pkinesis-asl -DskipTests clean install` locally, and verified the jars in my local repository were correct. I also checked Python tests for maven build, and it passed all tests. Author: zsxwing <zsxwing@gmail.com> Closes #8373 from zsxwing/SPARK-10168 and squashes the following commits: e0b5818 [zsxwing] Fix the sbt build c697627 [zsxwing] Add the jar pathes to the exception message be1d8a5 [zsxwing] Fix the issue that maven publishes wrong artifact jars (cherry picked from commit 4e0395ddb764d092b5b38447af49e196e590e0f0) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--external/flume-assembly/pom.xml1
-rw-r--r--external/kafka-assembly/pom.xml1
-rw-r--r--external/mqtt-assembly/pom.xml1
-rw-r--r--extras/kinesis-asl-assembly/pom.xml1
-rw-r--r--python/pyspark/streaming/tests.py47
5 files changed, 26 insertions, 25 deletions
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index e05e431896..561ed4babe 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -115,7 +115,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 36342f37bb..6f4e2a89e9 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -142,7 +142,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
index f3e3f93e7e..8412600633 100644
--- a/external/mqtt-assembly/pom.xml
+++ b/external/mqtt-assembly/pom.xml
@@ -132,7 +132,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml
index 3ca538608f..51af3e6f22 100644
--- a/extras/kinesis-asl-assembly/pom.xml
+++ b/extras/kinesis-asl-assembly/pom.xml
@@ -137,7 +137,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 510a4f2b3e..cfea95b0de 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1162,11 +1162,20 @@ class KinesisStreamTests(PySparkStreamingTestCase):
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
+# Search jar in the project dir using the jar name_prefix for both sbt build and maven build because
+# the artifact jars are in different directories.
+def search_jar(dir, name_prefix):
+ # We should ignore the following jars
+ ignored_jar_suffixes = ("javadoc.jar", "sources.jar", "test-sources.jar", "tests.jar")
+ jars = (glob.glob(os.path.join(dir, "target/scala-*/" + name_prefix + "-*.jar")) + # sbt build
+ glob.glob(os.path.join(dir, "target/" + name_prefix + "_*.jar"))) # maven build
+ return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)]
+
+
def search_kafka_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
- jars = glob.glob(
- os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar"))
+ jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
@@ -1174,8 +1183,8 @@ def search_kafka_assembly_jar():
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
- "remove all but one") % kafka_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1183,8 +1192,7 @@ def search_kafka_assembly_jar():
def search_flume_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
- jars = glob.glob(
- os.path.join(flume_assembly_dir, "target/scala-*/spark-streaming-flume-assembly-*.jar"))
+ jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
@@ -1192,8 +1200,8 @@ def search_flume_assembly_jar():
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
- "remove all but one") % flume_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1201,8 +1209,7 @@ def search_flume_assembly_jar():
def search_mqtt_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
mqtt_assembly_dir = os.path.join(SPARK_HOME, "external/mqtt-assembly")
- jars = glob.glob(
- os.path.join(mqtt_assembly_dir, "target/scala-*/spark-streaming-mqtt-assembly-*.jar"))
+ jars = search_jar(mqtt_assembly_dir, "spark-streaming-mqtt-assembly")
if not jars:
raise Exception(
("Failed to find Spark Streaming MQTT assembly jar in %s. " % mqtt_assembly_dir) +
@@ -1210,8 +1217,8 @@ def search_mqtt_assembly_jar():
"'build/sbt assembly/assembly streaming-mqtt-assembly/assembly' or "
"'build/mvn package' before running this test")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT assembly JARs in %s; please "
- "remove all but one") % mqtt_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1227,8 +1234,8 @@ def search_mqtt_test_jar():
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please "
- "remove all but one") % mqtt_test_dir)
+ raise Exception(("Found multiple Spark Streaming MQTT test JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1236,14 +1243,12 @@ def search_mqtt_test_jar():
def search_kinesis_asl_assembly_jar():
SPARK_HOME = os.environ["SPARK_HOME"]
kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "extras/kinesis-asl-assembly")
- jars = glob.glob(
- os.path.join(kinesis_asl_assembly_dir,
- "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
+ jars = search_jar(kinesis_asl_assembly_dir, "spark-streaming-kinesis-asl-assembly")
if not jars:
return None
elif len(jars) > 1:
- raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
- "remove all but one") % kinesis_asl_assembly_dir)
+ raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs: %s; please "
+ "remove all but one") % (", ".join(jars)))
else:
return jars[0]
@@ -1269,8 +1274,8 @@ if __name__ == "__main__":
mqtt_test_jar, kinesis_asl_assembly_jar)
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
- testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
- CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
+ testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)