aboutsummaryrefslogtreecommitdiff
path: root/project
diff options
context:
space:
mode:
authorPrabeesh K <prabsmails@gmail.com>2015-08-10 16:33:23 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-10 16:33:23 -0700
commit853809e948e7c5092643587a30738115b6591a59 (patch)
treeb778c6afe2627f6fc67db93a413f59a2ef480017 /project
parentc4fd2a242228ee101904770446e3f37d49e39b76 (diff)
downloadspark-853809e948e7c5092643587a30738115b6591a59.tar.gz
spark-853809e948e7c5092643587a30738115b6591a59.tar.bz2
spark-853809e948e7c5092643587a30738115b6591a59.zip
[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python
This PR is based on #4229, thanks prabeesh. Closes #4229 Author: Prabeesh K <prabsmails@gmail.com> Author: zsxwing <zsxwing@gmail.com> Author: prabs <prabsmails@gmail.com> Author: Prabeesh K <prabeesh.k@namshi.com> Closes #7833 from zsxwing/pr4229 and squashes the following commits: 9570bec [zsxwing] Fix the variable name and check null in finally 4a9c79e [zsxwing] Fix pom.xml indentation abf5f18 [zsxwing] Merge branch 'master' into pr4229 935615c [zsxwing] Fix the flaky MQTT tests 47278c5 [zsxwing] Include the project class files 478f844 [zsxwing] Add unpack 5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests 734db99 [zsxwing] Merge branch 'master' into pr4229 126608a [Prabeesh K] address the comments b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229 d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test a6747cb [Prabeesh K] wait for starting the receiver before publishing data 87fc677 [Prabeesh K] address the comments: 97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt 80474d1 [Prabeesh K] fix 1f0cfe9 [Prabeesh K] python style fix e1ee016 [Prabeesh K] scala style fix a5a8f9f [Prabeesh K] added Python test 9767d82 [Prabeesh K] implemented Python-friendly class a11968b [Prabeesh K] fixed python style 795ec27 [Prabeesh K] address comments ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly 3f4df12 [Prabeesh K] updated version b34c3c1 [prabs] adress comments 3aa7fff [prabs] Added Python streaming mqtt word count example b7d42ff [prabs] Mqtt streaming support in Python
Diffstat (limited to 'project')
-rw-r--r--project/SparkBuild.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9a33baa7c6..41a85fa9de 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -45,8 +45,8 @@ object BuildCommons {
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
"kinesis-asl").map(ProjectRef(buildLocation, _))
- val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) =
- Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly")
+ val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly, streamingKinesisAslAssembly) =
+ Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly", "streaming-kinesis-asl-assembly")
.map(ProjectRef(buildLocation, _))
val tools = ProjectRef(buildLocation, "tools")
@@ -212,6 +212,9 @@ object SparkBuild extends PomBuild {
/* Enable Assembly for all assembly projects */
assemblyProjects.foreach(enable(Assembly.settings))
+ /* Enable Assembly for streamingMqtt test */
+ enable(inConfig(Test)(Assembly.settings))(streamingMqtt)
+
/* Package pyspark artifacts in a separate zip file for YARN. */
enable(PySparkAssembly.settings)(assembly)
@@ -382,13 +385,16 @@ object Assembly {
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
},
jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
- if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
+ if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-mqtt-assembly") || mName.contains("streaming-kinesis-asl-assembly")) {
// This must match the same name used in maven (see external/kafka-assembly/pom.xml)
s"${mName}-${v}.jar"
} else {
s"${mName}-${v}-hadoop${hv}.jar"
}
},
+ jarName in (Test, assembly) <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) =>
+ s"${mName}-test-${v}.jar"
+ },
mergeStrategy in assembly := {
case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard