aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-07 22:21:52 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-07 22:21:52 -0800
commitc0f0155eca6405d0768a476f0be00594e478fce0 (patch)
tree0ceb61d8adf28cc47291e055ff0fdc503766f571
parentf5f12dc28218f3ed89836434ab0530e88b043e47 (diff)
parent8f02f1c3d45ee553ed6bec2dc81fbae4435274fc (diff)
downloadspark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.gz
spark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.bz2
spark-c0f0155eca6405d0768a476f0be00594e478fce0.zip
Merge pull request #313 from tdas/project-refactor
Refactored the streaming project to separate external libraries like Twitter, Kafka, Flume, etc. At a high level, these are the following changes. 1. All the external code was put in `SPARK_HOME/external/` as separate SBT projects and Maven modules. Their artifact names are `spark-streaming-twitter`, `spark-streaming-kafka`, etc. Both SparkBuild.scala and pom.xml files have been updated. References to external libraries and repositories have been removed from the settings of root and streaming projects/modules. 2. To avail the external functionality (say, creating a Twitter stream), the developer has to `import org.apache.spark.streaming.twitter._` . For Scala API, the developer has to call `TwitterUtils.createStream(streamingContext, ...)`. For the Java API, the developer has to call `TwitterUtils.createStream(javaStreamingContext, ...)`. 3. Each external project has its own scala and java unit tests. Note the unit tests of each external library use classes of the streaming unit tests (`TestSuiteBase`, `LocalJavaStreamingContext`, etc.). To enable this code sharing among test classes, `dependsOn(streaming % "compile->compile,test->test")` was used in the SparkBuild.scala . In the streaming/pom.xml, an additional `maven-jar-plugin` was necessary to capture this dependency (see comment inside the pom.xml for more information). 4. Jars of the external projects have been added to examples project but not to the assembly project. 5. In some files, imports have been rearrange to conform to the Spark coding guidelines.
-rwxr-xr-xbin/run-example7
-rw-r--r--examples/pom.xml42
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java10
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala9
-rw-r--r--external/flume/pom.xml93
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala)5
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala70
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java34
-rw-r--r--external/flume/src/test/resources/log4j.properties29
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala86
-rw-r--r--external/kafka/pom.xml97
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala)13
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala153
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java45
-rw-r--r--external/kafka/src/test/resources/log4j.properties29
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala39
-rw-r--r--external/mqtt/pom.xml108
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala)28
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala75
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java37
-rw-r--r--external/mqtt/src/test/resources/log4j.properties29
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala36
-rw-r--r--external/twitter/pom.xml89
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala)11
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala126
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java46
-rw-r--r--external/twitter/src/test/resources/log4j.properties29
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala43
-rw-r--r--external/zeromq/pom.xml89
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala)3
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala126
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java50
-rw-r--r--external/zeromq/src/test/resources/log4j.properties29
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala44
-rw-r--r--pom.xml29
-rw-r--r--project/SparkBuild.scala93
-rw-r--r--streaming/pom.xml83
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala145
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala243
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala4
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java104
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java46
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala82
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala20
51 files changed, 1907 insertions, 739 deletions
diff --git a/bin/run-example b/bin/run-example
index 6c5d4a6a8f..2e9d51440b 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -45,20 +45,15 @@ fi
EXAMPLES_DIR="$FWDIR"/examples
SPARK_EXAMPLES_JAR=""
if [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then
- # Use the JAR from the SBT build
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar`
fi
-if [ -e "$EXAMPLES_DIR"/target/spark-examples*[0-9Tg].jar ]; then
- # Use the JAR from the Maven build
- # TODO: this also needs to become an assembly!
- export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/spark-examples*[0-9Tg].jar`
-fi
if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
exit 1
fi
+
# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
diff --git a/examples/pom.xml b/examples/pom.xml
index 7e41bef252..cb4f7ee33b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -72,6 +72,31 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
@@ -87,21 +112,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>com.sksamuel.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.0-beta1</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -174,7 +184,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index fd683ce0d3..b11cfa667e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@@ -52,11 +53,10 @@ public final class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+ JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
@@ -67,6 +67,6 @@ public final class JavaFlumeEventCount {
}
}).print();
- sc.start();
+ ssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index d8b4f4dddd..16b8a948e6 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
/**
@@ -59,7 +60,7 @@ public final class JavaKafkaWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
@@ -70,7 +71,7 @@ public final class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+ JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
@@ -100,6 +101,6 @@ public final class JavaKafkaWordCount {
});
wordCounts.print();
- ssc.start();
+ jssc.start();
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index 5ef1928294..ae3709b3d9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
/**
* Produces a count of events received from Flume.
@@ -51,7 +52,7 @@ object FlumeEventCount {
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+ val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 197461655e..31a94bd224 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -24,6 +24,7 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.util.RawTextHelper._
+import org.apache.spark.streaming.kafka._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -52,7 +53,7 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 2d02ef77c0..325290b66f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,11 +17,6 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.MQTTReceiver
-import org.apache.spark.storage.StorageLevel
-
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.mqtt._
+
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
* Space separated String Message "hello mqtt demo for spark streaming"
@@ -97,7 +97,7 @@ object MQTTWordCount {
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
- val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+ val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 9d21d3178f..3ccdc908e2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -23,6 +23,8 @@ import com.twitter.algebird._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
+
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -33,7 +35,7 @@ import org.apache.spark.SparkContext._
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
@@ -61,7 +63,7 @@ object TwitterAlgebirdCMS {
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 5111e6f62a..c7e83e76b0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
-import org.apache.spark.streaming.dstream.TwitterInputDStream
+import org.apache.spark.streaming.twitter._
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
@@ -50,7 +50,7 @@ object TwitterAlgebirdHLL {
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 7a3df687b7..e2b0418d55 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
@@ -39,7 +40,7 @@ object TwitterPopularTags {
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters)
+ val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index beda73a71b..03902ec353 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
import akka.util.ByteString
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.zeromq._
+
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.
@@ -83,11 +85,10 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
//For this stream, a zeroMQ publisher should be running.
- val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
-
}
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000000..443910a03a
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 60d79175f1..ce3ef47cfe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.dstream
+package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
@@ -30,9 +30,10 @@ import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
private[streaming]
class FlumeInputDStream[T: ClassTag](
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
new file mode 100644
index 0000000000..834b775d4f
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+
+object FlumeUtils {
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel)
+ }
+}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000000..733389b98d
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ // tests the API, does not actually test data receiving
+ JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000000..2e8e9fac45
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.util.ManualClock
+
+class FlumeStreamSuite extends TestSuiteBase {
+
+ val testPort = 9999
+
+ test("flume input stream") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+ Thread.sleep(1000)
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
+ val client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+
+ for (i <- 0 until input.size) {
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ client.append(event)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
+ Thread.sleep(100)
+ }
+ Thread.sleep(1000)
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+
+ assert(outputBuffer.size === input.length)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ val str = decoder.decode(outputBuffer(i).head.event.getBody)
+ assert(str.toString === input(i).toString)
+ assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+ }
+ }
+}
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
new file mode 100644
index 0000000000..f782e0e126
--- /dev/null
+++ b/external/kafka/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Kafka</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sksamuel.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>0.8.0-beta1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 526f5564c7..a2cd49c573 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.dstream
+package org.apache.spark.streaming.kafka
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
+import scala.collection.Map
+import scala.reflect.ClassTag
import java.util.Properties
import java.util.concurrent.Executors
@@ -30,8 +29,10 @@ import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
-import scala.collection.Map
-import scala.reflect.ClassTag
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
/**
* Input stream that pulls messages from a Kafka Broker.
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000000..c2d851f943
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.{Decoder, StringDecoder}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+
+
+object KafkaUtils {
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ *
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+ */
+ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ jssc: JavaStreamingContext,
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ createStream[K, V, U, T](
+ jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
new file mode 100644
index 0000000000..7b4999447e
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.util.HashMap;
+import org.junit.Test;
+import com.google.common.collect.Maps;
+import kafka.serializer.StringDecoder;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testKafkaStream() {
+ HashMap<String, Integer> topics = Maps.newHashMap();
+
+ // tests the API, does not actually test data receiving
+ JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+
+ HashMap<String, String> kafkaParams = Maps.newHashMap();
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
+ String.class, String.class, StringDecoder.class, StringDecoder.class,
+ kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
new file mode 100644
index 0000000000..9c81f23c19
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.serializer.StringDecoder
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+
+class KafkaStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val topics = Map("my-topic" -> 1)
+
+ // tests the API, does not actually test data receiving
+ val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
+ val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // TODO: Actually test receiving data
+ }
+}
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
new file mode 100644
index 0000000000..31b4fa87de
--- /dev/null
+++ b/external/mqtt/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External MQTT</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <repositories>
+ <repository>
+ <id>mqtt-repo</id>
+ <name>MQTT Repository</name>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>0.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index ef4a737568..c8987a3ee0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.dstream
+package org.apache.spark.streaming.mqtt
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import java.util.Properties
import java.util.concurrent.Executors
@@ -34,10 +35,10 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
/**
* Input stream that subscribe messages from a Mqtt Broker.
@@ -49,15 +50,14 @@ import scala.reflect.ClassTag
private[streaming]
class MQTTInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
def getReceiver(): NetworkReceiver[T] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
+ new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
new file mode 100644
index 0000000000..0e6c25dbee
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import scala.reflect.ClassTag
+
+object MQTTUtils {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, brokerUrl, topic, storageLevel)
+ }
+}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
new file mode 100644
index 0000000000..44743aaecf
--- /dev/null
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.junit.Test;
+
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testMQTTStream() {
+ String brokerUrl = "abc";
+ String topic = "def";
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/mqtt/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
new file mode 100644
index 0000000000..fcc159e85a
--- /dev/null
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+
+class MQTTStreamSuite extends TestSuiteBase {
+
+ test("MQTT input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val brokerUrl = "abc"
+ val topic = "def"
+
+ // tests the API, does not actually test data receiving
+ val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // TODO: Actually test receiving data
+ }
+}
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
new file mode 100644
index 0000000000..216e6c1d8f
--- /dev/null
+++ b/external/twitter/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-twitter_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Twitter</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>3.0.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 387e15b0e6..5cc721d7f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.dstream
+package org.apache.spark.streaming.twitter
-import org.apache.spark._
-import org.apache.spark.streaming._
-import storage.StorageLevel
+import java.util.prefs.Preferences
import twitter4j._
import twitter4j.auth.Authorization
-import java.util.prefs.Preferences
import twitter4j.conf.ConfigurationBuilder
import twitter4j.conf.PropertyConfiguration
import twitter4j.auth.OAuthAuthorization
import twitter4j.auth.AccessToken
+import org.apache.spark._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.storage.StorageLevel
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
new file mode 100644
index 0000000000..5e506ffabc
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+
+object TwitterUtils {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param ssc StreamingContext object
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ ssc: StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
+ */
+ def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ createStream(jssc.ssc, None)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J Authorization
+ */
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ createStream(jssc.ssc, Some(twitterAuth))
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J Authorization
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ twitterAuth: Authorization,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ createStream(jssc.ssc, Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ twitterAuth: Authorization,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
+ }
+}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
new file mode 100644
index 0000000000..e46b4e5c75
--- /dev/null
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+import twitter4j.Status;
+import twitter4j.auth.Authorization;
+import twitter4j.auth.NullAuthorization;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testTwitterStream() {
+ String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
+ Authorization auth = NullAuthorization.getInstance();
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
+ JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters);
+ JavaDStream<Status> test3 = TwitterUtils.createStream(
+ ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth);
+ JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
+ JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
+ auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/twitter/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
new file mode 100644
index 0000000000..a0a8fe617b
--- /dev/null
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+import twitter4j.auth.{NullAuthorization, Authorization}
+
+class TwitterStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val filters = Seq("filter1", "filter2")
+ val authorization: Authorization = NullAuthorization.getInstance()
+
+ // tests the API, does not actually test data receiving
+ val test1 = TwitterUtils.createStream(ssc, None)
+ val test2 = TwitterUtils.createStream(ssc, None, filters)
+ val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = TwitterUtils.createStream(ssc, Some(authorization))
+ val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
+ StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // Note that actually testing the data receiving is hard as authentication keys are
+ // necessary for accessing Twitter live stream
+ }
+}
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
new file mode 100644
index 0000000000..c240d59574
--- /dev/null
+++ b/external/zeromq/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-zeromq_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External ZeroMQ</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index f164d516b0..769761e3b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.receivers
+package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
@@ -24,6 +24,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
+import org.apache.spark.streaming.receivers._
/**
* A receiver to subscribe to ZeroMQ stream.
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
new file mode 100644
index 0000000000..546d9df3b5
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+
+object ZeroMQUtils {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+ }
+}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
new file mode 100644
index 0000000000..d2361e14b8
--- /dev/null
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq;
+
+import org.junit.Test;
+import akka.actor.SupervisorStrategy;
+import akka.util.ByteString;
+import akka.zeromq.Subscribe;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testZeroMQStream() {
+ String publishUrl = "abc";
+ Subscribe subscribe = new Subscribe((ByteString)null);
+ Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+ };
+
+ JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ SupervisorStrategy.defaultStrategy());
+ }
+}
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
new file mode 100644
index 0000000000..4193b8a02f
--- /dev/null
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+
+class ZeroMQStreamSuite extends TestSuiteBase {
+
+ test("zeromq input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val publishUrl = "abc"
+ val subscribe = new Subscribe(null.asInstanceOf[ByteString])
+ val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
+
+ // tests the API, does not actually test data receiving
+ val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2 = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+
+ // TODO: Actually test data receiving
+ }
+}
diff --git a/pom.xml b/pom.xml
index 78d2f162b5..c8e373772e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,12 +87,17 @@
<modules>
<module>core</module>
<module>bagel</module>
- <module>examples</module>
<module>mllib</module>
<module>tools</module>
<module>streaming</module>
<module>repl</module>
<module>assembly</module>
+ <module>external/twitter</module>
+ <module>external/kafka</module>
+ <module>external/flume</module>
+ <module>external/zeromq</module>
+ <module>external/mqtt</module>
+ <module>examples</module>
</modules>
<properties>
@@ -140,17 +145,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
- <repository>
- <id>mqtt-repo</id>
- <name>MQTT Repository</name>
- <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
</repositories>
<dependencyManagement>
@@ -259,17 +253,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>6.4.4</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 051e5105f3..bd5816ea28 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -50,9 +50,6 @@ object SparkBuild extends Build {
lazy val repl = Project("repl", file("repl"), settings = replSettings)
.dependsOn(core, bagel, mllib)
- lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming)
-
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
@@ -93,10 +90,31 @@ object SparkBuild extends Build {
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](if (isNewHadoop) yarn else yarnAlpha) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](if (isNewHadoop) yarn else yarnAlpha) else Seq[ProjectReference]()
+ lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
+ .dependsOn(streaming % "compile->compile;test->test")
+
+ lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+ lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
+
+ lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
+ .dependsOn(core, mllib, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
+
// Everything except assembly, tools and examples belong to packageProjects
lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
- lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
+ lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
@@ -125,7 +143,7 @@ object SparkBuild extends Build {
// also check the local Maven repository ~/.m2
resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
- // For Sonatype publishing
+ // For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@@ -167,7 +185,7 @@ object SparkBuild extends Build {
</issueManagement>
),
-/*
+ /*
publishTo <<= version { (v: String) =>
val nexus = "https://oss.sonatype.org/"
if (v.trim.endsWith("SNAPSHOT"))
@@ -176,8 +194,7 @@ object SparkBuild extends Build {
Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
},
-*/
-
+ */
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.13.Final",
@@ -266,7 +283,6 @@ object SparkBuild extends Build {
libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
-
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples",
libraryDependencies ++= Seq(
@@ -303,22 +319,8 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
- resolvers ++= Seq(
- "Eclipse Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
- "Apache repo" at "https://repository.apache.org/content/repositories/releases"
- ),
-
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
- exclude("com.sun.jdmk", "jmxtools")
- exclude("com.sun.jmx", "jmxri")
- exclude("net.sf.jopt-simple", "jopt-simple")
- excludeAll(excludeNetty),
- "org.eclipse.paho" % "mqtt-client" % "0.4.0",
- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+ "commons-io" % "commons-io" % "2.4"
)
)
@@ -352,8 +354,8 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
- "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
)
@@ -378,4 +380,43 @@ object SparkBuild extends Build {
case _ => MergeStrategy.first
}
)
+
+ def twitterSettings() = sharedSettings ++ Seq(
+ name := "spark-streaming-twitter",
+ libraryDependencies ++= Seq(
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
+ )
+ )
+
+ def kafkaSettings() = sharedSettings ++ Seq(
+ name := "spark-streaming-kafka",
+ libraryDependencies ++= Seq(
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ exclude("com.sun.jdmk", "jmxtools")
+ exclude("com.sun.jmx", "jmxri")
+ exclude("net.sf.jopt-simple", "jopt-simple")
+ excludeAll(excludeNetty)
+ )
+ )
+
+ def flumeSettings() = sharedSettings ++ Seq(
+ name := "spark-streaming-flume",
+ libraryDependencies ++= Seq(
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy)
+ )
+ )
+
+ def zeromqSettings() = sharedSettings ++ Seq(
+ name := "spark-streaming-zeromq",
+ libraryDependencies ++= Seq(
+ "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
+ )
+ )
+
+ def mqttSettings() = streamingSettings ++ Seq(
+ name := "spark-streaming-mqtt",
+ resolvers ++= Seq("Eclipse Repo" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
+ libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0")
+ )
}
diff --git a/streaming/pom.xml b/streaming/pom.xml
index e3b6fee9b2..459756912d 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -61,59 +61,10 @@
<version>1.9.11</version>
</dependency>
<dependency>
- <groupId>com.sksamuel.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.0-beta1</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <version>1.2.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>3.0.3</version>
- <exclusions>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
@@ -137,11 +88,6 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>mqtt-client</artifactId>
- <version>0.4.0</version>
- </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
@@ -151,6 +97,35 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
+
+ <!--
+ This plugin forces the generation of jar containing streaming test classes,
+ so that the tests classes of external modules can use them. The two execution profiles
+ are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
+ 'mvn compile' should not compile test classes and therefore should not need this.
+ However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
+ causes the compilation to fail if streaming test-jar is not generated. Hence, the
+ second execution profile for 'mvn compile'.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>test-jar-on-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b3a7cf08b9..693cb7fc30 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -17,23 +17,6 @@
package org.apache.spark.streaming
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.scheduler.StreamingListener
-
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.receivers.ZeroMQReceiver
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.MetadataCleaner
-import org.apache.spark.streaming.receivers.ActorReceiver
-import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker}
-
import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag
@@ -41,14 +24,21 @@ import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import twitter4j.Status
-import twitter4j.auth.Authorization
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.scheduler._
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -238,74 +228,6 @@ class StreamingContext private (
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
- kafkaParams,
- topics,
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def kafkaStream[
- K: ClassTag,
- V: ClassTag,
- U <: kafka.serializer.Decoder[_]: Manifest,
- T <: kafka.serializer.Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
@@ -344,22 +266,6 @@ class StreamingContext private (
}
/**
- * Create a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream (
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
@@ -434,24 +340,6 @@ class StreamingContext private (
}
/**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Option[Authorization] = None,
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
- val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
-
- /**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
@@ -484,21 +372,6 @@ class StreamingContext private (
inputStream
}
-/**
- * Create an input stream that receives messages pushed by a mqtt publisher.
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
-
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 7dec4b3ad7..7068f32517 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,19 +17,16 @@
package org.apache.spark.streaming.api.java
-import java.io.InputStream
-import java.lang.{Integer => JInt}
-import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import java.io.InputStream
+import java.lang.{Integer => JInt}
+import java.util.{List => JList, Map => JMap}
+
import akka.actor.{Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import twitter4j.Status
-import twitter4j.auth.Authorization
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
@@ -37,7 +34,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@@ -141,81 +137,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt])
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- StorageLevel.MEMORY_ONLY_SER_2)
-
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel)
- : JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
@@ -329,98 +250,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
- JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port, storageLevel)
- }
-
-
- /**
- * Create a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def twitterStream(
- filters: Array[String]
- ): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
- */
- def twitterStream(
- twitterAuth: Authorization
- ): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret to be set.
- */
- def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream()
- }
-
- /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
@@ -483,70 +312,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- */
- def zeroMQStream[T](
- publisherUrl:String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
- }
-
- /**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index abff55d77c..75f7244643 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -160,7 +160,9 @@ class NetworkInputTracker(
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
- ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ if (!ssc.sparkContext.isLocal) {
+ ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+ }
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index d53d433693..0d2145da9a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,24 +17,19 @@
package org.apache.spark.streaming;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-
-import kafka.serializer.StringDecoder;
+import scala.Tuple2;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
+import java.io.*;
+import java.util.*;
-import scala.Tuple2;
-import twitter4j.Status;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -44,42 +39,11 @@ import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
-import org.apache.spark.streaming.JavaTestUtils;
-import org.apache.spark.streaming.JavaCheckpointTestUtils;
-
-import java.io.*;
-import java.util.*;
-
-import akka.actor.Props;
-import akka.zeromq.Subscribe;
-
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
- private transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port");
- }
-
+public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -1601,26 +1565,6 @@ public class JavaAPISuite implements Serializable {
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@Test
- public void testKafkaStream() {
- HashMap<String, Integer> topics = Maps.newHashMap();
- JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
- StorageLevel.MEMORY_AND_DISK());
-
- HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
- kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = ssc.kafkaStream(
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topics,
- StorageLevel.MEMORY_AND_DISK());
- }
-
- @Test
public void testSocketTextStream() {
JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@@ -1658,36 +1602,4 @@ public class JavaAPISuite implements Serializable {
public void testRawSocketStream() {
JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
-
- @Test
- public void testFlumeStream() {
- JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
- }
-
- @Test
- public void testFileStream() {
- JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
- }
-
- @Test
- public void testTwitterStream() {
- String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
- }
-
- @Test
- public void testActorStream() {
- JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
- }
-
- @Test
- public void testZeroMQStream() {
- JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
- @Override
- public Iterable<String> call(byte[][] b) throws Exception {
- return null;
- }
- });
- }
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000000..34bee56885
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port");
+ System.clearProperty("spark.hostPort");
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 5185954521..a8e053278a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-import org.apache.flume.source.avro.AvroSourceProtocol
-import org.apache.flume.source.avro.AvroFlumeEvent
-import org.apache.flume.source.avro.Status
-import org.apache.avro.ipc.{specific, NettyTransceiver}
-import org.apache.avro.ipc.specific.SpecificRequestor
-import java.nio.ByteBuffer
import collection.JavaConversions._
-import java.nio.charset.Charset
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
@@ -56,7 +49,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
@@ -99,62 +92,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("flume input stream") {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
- val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = Seq(1, 2, 3, 4, 5)
- Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
-
- for (i <- 0 until input.size) {
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- client.append(event)
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- }
-
- val startTime = System.currentTimeMillis()
- while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
- Thread.sleep(100)
- }
- Thread.sleep(1000)
- val timeTaken = System.currentTimeMillis() - startTime
- assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- logInfo("Stopping context")
- ssc.stop()
-
- val decoder = Charset.forName("UTF-8").newDecoder()
-
- assert(outputBuffer.size === input.length)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- val str = decoder.decode(outputBuffer(i).head.event.getBody)
- assert(str.toString === input(i).toString)
- assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
- }
- }
-
-
test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
@@ -206,7 +150,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
testServer.start()
// Set up the streaming context and input streams
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
@@ -249,20 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("kafka input stream") {
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
- val topics = Map("my-topic" -> 1)
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
-
- // Test specifying decoder
- val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[
- String,
- String,
- kafka.serializer.StringDecoder,
- kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
- }
test("multi-thread receiver") {
// set up the test receiver
@@ -273,7 +203,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
MultiThreadTestReceiver.haveAllThreadsFinished = false
// set up the network stream using the test receiver
- val ssc = new StreamingContext(new SparkContext(conf), batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
val networkStream = ssc.networkStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 33464bc3a1..b20d02f996 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -142,16 +142,12 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Default before function for any streaming test suite. Override this
// if you want to add your stuff to "before" (i.e., don't call before { } )
def beforeFunction() {
- //if (useManualClock) {
- // System.setProperty(
- // "spark.streaming.clock",
- // "org.apache.spark.streaming.util.ManualClock"
- // )
- //} else {
- // System.clearProperty("spark.streaming.clock")
- //}
if (useManualClock) {
+ logInfo("Using manual clock")
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ } else {
+ logInfo("Using real clock")
+ conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
}
}
@@ -175,9 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
): StreamingContext = {
- val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(sc, batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
@@ -201,9 +196,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
): StreamingContext = {
- val sc = new SparkContext(conf)
// Create StreamingContext
- val ssc = new StreamingContext(sc, batchDuration)
+ val ssc = new StreamingContext(conf, batchDuration)
if (checkpointDir != null) {
ssc.checkpoint(checkpointDir)
}
@@ -279,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(100)
+ Thread.sleep(10)
}
val timeTaken = System.currentTimeMillis() - startTime