aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/flume/pom.xml93
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala48
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala43
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala155
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java35
-rw-r--r--external/flume/src/test/resources/log4j.properties29
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala86
-rw-r--r--external/kafka/pom.xml97
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala107
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala73
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala154
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java50
-rw-r--r--external/kafka/src/test/resources/log4j.properties29
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala39
-rw-r--r--external/mqtt/pom.xml108
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala59
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala43
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala110
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala24
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java39
-rw-r--r--external/mqtt/src/test/resources/log4j.properties29
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala36
-rw-r--r--external/twitter/pom.xml89
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala99
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala49
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala100
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala22
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java50
-rw-r--r--external/twitter/src/test/resources/log4j.properties29
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala42
-rw-r--r--external/zeromq/pom.xml89
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala102
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala57
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala54
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala24
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java53
-rw-r--r--external/zeromq/src/test/resources/log4j.properties29
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala44
40 files changed, 2464 insertions, 0 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
new file mode 100644
index 0000000000..443910a03a
--- /dev/null
+++ b/external/flume/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
new file mode 100644
index 0000000000..3347d19796
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.flume
+
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.flume._
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Flume input streams.
+ */
+class FlumeFunctions(javaStreamingContext: JavaStreamingContext) {
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
+ javaStreamingContext.ssc.flumeStream(hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+ JavaDStream[SparkFlumeEvent] = {
+ javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel)
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala
new file mode 100644
index 0000000000..35e7a01abc
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions.
+ */
+class FlumeFunctions(ssc: StreamingContext) {
+ /**
+ * Create a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream (
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
new file mode 100644
index 0000000000..ce3ef47cfe
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.InetSocketAddress
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.flume.source.avro.AvroSourceProtocol
+import org.apache.flume.source.avro.AvroFlumeEvent
+import org.apache.flume.source.avro.Status
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.avro.ipc.NettyServer
+
+import org.apache.spark.util.Utils
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+private[streaming]
+class FlumeInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ new FlumeReceiver(host, port, storageLevel)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event : AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput) {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.read(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.read(keyBuff)
+ val key : String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.read(valBuff)
+ val value : String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput) {
+ val body = event.getBody.array()
+ out.writeInt(body.length)
+ out.write(body)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+private[streaming] object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+private[streaming]
+class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event : AvroFlumeEvent) : Status = {
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ Status.OK
+ }
+
+ override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ events.foreach (event =>
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface.*/
+private[streaming]
+class FlumeReceiver(
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent] {
+
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ val server = new NettyServer(responder, new InetSocketAddress(host, port))
+ blockGenerator.start()
+ server.start()
+ logInfo("Flume receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ logInfo("Flume receiver stopped")
+ }
+
+ override def getLocationPreference = Some(host)
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
new file mode 100644
index 0000000000..c087a39d1c
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object flume {
+ implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc)
+}
+
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
new file mode 100644
index 0000000000..5930fee925
--- /dev/null
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -0,0 +1,35 @@
+package org.apache.spark.streaming.flume;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+import org.junit.Test;
+
+public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testFlumeStream() {
+ FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/flume/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
new file mode 100644
index 0000000000..74840f6499
--- /dev/null
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.util.ManualClock
+
+class FlumeStreamSuite extends TestSuiteBase {
+
+ val testPort = 9999
+
+ test("flume input stream") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+ Thread.sleep(1000)
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
+ val client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+
+ for (i <- 0 until input.size) {
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ client.append(event)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
+ Thread.sleep(100)
+ }
+ Thread.sleep(1000)
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+
+ assert(outputBuffer.size === input.length)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ val str = decoder.decode(outputBuffer(i).head.event.getBody)
+ assert(str.toString === input(i).toString)
+ assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+ }
+ }
+}
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
new file mode 100644
index 0000000000..f782e0e126
--- /dev/null
+++ b/external/kafka/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Kafka</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sksamuel.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>0.8.0-beta1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
new file mode 100644
index 0000000000..491331bb37
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.Decoder
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.kafka._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Kafka input streams.
+ */
+class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ *
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ */
+ def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ javaStreamingContext.ssc.kafkaStream[K, V, U, T](
+ kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
new file mode 100644
index 0000000000..2135634a69
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.ClassTag
+
+import kafka.serializer.{Decoder, StringDecoder}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions.
+ */
+class KafkaFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ kafkaStream[String, String, StringDecoder, StringDecoder](
+ kafkaParams,
+ topics,
+ storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
new file mode 100644
index 0000000000..a2cd49c573
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.Map
+import scala.reflect.ClassTag
+
+import java.util.Properties
+import java.util.concurrent.Executors
+
+import kafka.consumer._
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
+ @transient ssc_ : StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[(K, V)] = {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ .asInstanceOf[NetworkReceiver[(K, V)]]
+ }
+}
+
+private[streaming]
+class KafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+
+ // Handles pushing data into the BlockManager
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+ // Connection to Kafka
+ var consumerConnector : ConsumerConnector = null
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // In case we are using multiple Threads to handle Kafka Messages
+ val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
+
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
+
+ // Kafka connection properties
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+
+ // Create the connection to the cluster
+ logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
+ val consumerConfig = new ConsumerConfig(props)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + kafkaParams("zookeeper.connect"))
+
+ // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // consumer group zk node.
+ if (kafkaParams.contains("auto.offset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
+ }
+
+ val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ // Create Threads for each Topic/Message Stream we are listening
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ }
+
+ // Handles Kafka Messages
+ private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
+ extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ for (msgAndMetadata <- stream) {
+ blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
+ }
+ }
+ }
+
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+ // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ //
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ : Throwable => // swallow
+ }
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
new file mode 100644
index 0000000000..44e7ce6e1b
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object kafka {
+ implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc)
+}
+
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
new file mode 100644
index 0000000000..fdea96e506
--- /dev/null
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
+import org.junit.Test;
+import com.google.common.collect.Maps;
+import kafka.serializer.StringDecoder;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+
+public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testKafkaStream() {
+
+ HashMap<String, Integer> topics = Maps.newHashMap();
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
+
+ // tests the API, does not actually test data receiving
+ JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+
+ HashMap<String, String> kafkaParams = Maps.newHashMap();
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
+ String.class, String.class, StringDecoder.class, StringDecoder.class,
+ kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
new file mode 100644
index 0000000000..2ef3e99c55
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.serializer.StringDecoder
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+
+class KafkaStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val topics = Map("my-topic" -> 1)
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
+ val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
+ val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder](
+ kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // TODO: Actually test receiving data
+ }
+}
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
new file mode 100644
index 0000000000..31b4fa87de
--- /dev/null
+++ b/external/mqtt/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External MQTT</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <repositories>
+ <repository>
+ <id>mqtt-repo</id>
+ <name>MQTT Repository</name>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>0.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000000..72124956fc
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.mqtt
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating MQTT input streams.
+ */
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): JavaDStream[String] = {
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
new file mode 100644
index 0000000000..86f4e9c724
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
+ */
+class MQTTFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
new file mode 100644
index 0000000000..c8987a3ee0
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException
+
+import org.eclipse.paho.client.mqttv3.MqttCallback
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming]
+class MQTTInputDStream[T: ClassTag](
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[T](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming]
+class MQTTReceiver(brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ // Set up persistence for messages
+ var peristance: MqttClientPersistence = new MemoryPersistence()
+
+ // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance)
+
+ // Connect to MqttBroker
+ client.connect()
+
+ // Subscribe to Mqtt topic
+ client.subscribe(topic)
+
+ // Callback automatically triggers as and when new message arrives on specified topic
+ var callback: MqttCallback = new MqttCallback() {
+
+ // Handles Mqtt message
+ override def messageArrived(arg0: String, arg1: MqttMessage) {
+ blockGenerator += new String(arg1.getPayload())
+ }
+
+ override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(arg0: Throwable) {
+ logInfo("Connection lost " + arg0)
+ }
+ }
+
+ // Set up callback for MqttClient
+ client.setCallback(callback)
+ }
+}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
new file mode 100644
index 0000000000..28a944f57e
--- /dev/null
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object mqtt {
+ implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
+}
+
+
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
new file mode 100644
index 0000000000..3ddb4d084f
--- /dev/null
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
+import org.junit.Test;
+
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+
+public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testMQTTStream() {
+ String brokerUrl = "abc";
+ String topic = "def";
+ MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+ JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
+ StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/mqtt/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
new file mode 100644
index 0000000000..ab6542918b
--- /dev/null
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.mqtt
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+
+class MQTTStreamSuite extends TestSuiteBase {
+
+ test("MQTT input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val brokerUrl = "abc"
+ val topic = "def"
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.mqttStream(brokerUrl, topic)
+ val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // TODO: Actually test receiving data
+ }
+}
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
new file mode 100644
index 0000000000..216e6c1d8f
--- /dev/null
+++ b/external/twitter/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-twitter_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External Twitter</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>3.0.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
new file mode 100644
index 0000000000..22e297a03a
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating Twitter input streams.
+ */
+class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ */
+ def twitterStream(): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(filters: Array[String]): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ */
+ def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
new file mode 100644
index 0000000000..e91049d9b1
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+/**
+ * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions.
+ */
+class TwitterFunctions(ssc: StreamingContext) {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Option[Authorization],
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
new file mode 100644
index 0000000000..5cc721d7f9
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import java.util.prefs.Preferences
+import twitter4j._
+import twitter4j.auth.Authorization
+import twitter4j.conf.ConfigurationBuilder
+import twitter4j.conf.PropertyConfiguration
+import twitter4j.auth.OAuthAuthorization
+import twitter4j.auth.AccessToken
+import org.apache.spark._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.storage.StorageLevel
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*
+* If no Authorization object is provided, initializes OAuth authorization using the system
+* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
+*/
+private[streaming]
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ private def createOAuthAuthorization(): Authorization = {
+ new OAuthAuthorization(new ConfigurationBuilder().build())
+ }
+
+ private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
+
+ override def getReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(authorization, filters, storageLevel)
+ }
+}
+
+private[streaming]
+class TwitterReceiver(
+ twitterAuth: Authorization,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
new file mode 100644
index 0000000000..23f82c5885
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object twitter {
+ implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc)
+}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
new file mode 100644
index 0000000000..4564d6cd33
--- /dev/null
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter;
+
+import java.util.Arrays;
+
+import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
+import org.junit.Test;
+
+import twitter4j.Status;
+import twitter4j.auth.Authorization;
+import twitter4j.auth.NullAuthorization;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testTwitterStream() {
+ TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
+ String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
+ Authorization auth = NullAuthorization.getInstance();
+
+ // tests the API, does not actually test data receiving
+ JavaDStream<Status> test1 = twitterFunc.twitterStream();
+ JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
+ JavaDStream<Status> test3 =
+ twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
+ JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
+ JavaDStream<Status> test6 =
+ twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ }
+}
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/twitter/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
new file mode 100644
index 0000000000..d7f6d35e07
--- /dev/null
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.storage.StorageLevel
+import twitter4j.auth.{NullAuthorization, Authorization}
+
+class TwitterStreamSuite extends TestSuiteBase {
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val filters = Seq("filter1", "filter2")
+ val authorization: Authorization = NullAuthorization.getInstance()
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.twitterStream(None)
+ val test2 = ssc.twitterStream(None, filters)
+ val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = ssc.twitterStream(Some(authorization))
+ val test5 = ssc.twitterStream(Some(authorization), filters)
+ val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ // Note that actually testing the data receiving is hard as authentication keys are
+ // necessary for accessing Twitter live stream
+ }
+}
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
new file mode 100644
index 0000000000..c240d59574
--- /dev/null
+++ b/external/zeromq/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.9.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-zeromq_2.10</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project External ZeroMQ</name>
+ <url>http://spark.incubator.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000000..a9bbce71f5
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java.zeromq
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.zeromq._
+
+/**
+ * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
+ * functions for creating ZeroMQ input streams.
+ */
+class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
new file mode 100644
index 0000000000..f4c75ab7c9
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.receivers._
+
+/**
+ * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
+ * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
+ */
+class ZeroMQFunctions(ssc: StreamingContext) {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T: ClassTag](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
+}
+
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
new file mode 100644
index 0000000000..769761e3b8
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import scala.reflect.ClassTag
+
+import akka.actor.Actor
+import akka.util.ByteString
+import akka.zeromq._
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receivers._
+
+/**
+ * A receiver to subscribe to ZeroMQ stream.
+ */
+private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
+ extends Actor with Receiver with Logging {
+
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
+ Connect(publisherUrl), subscribe)
+
+ def receive: Receive = {
+
+ case Connecting ⇒ logInfo("connecting ...")
+
+ case m: ZMQMessage ⇒
+ logDebug("Received message for:" + m.frame(0))
+
+ //We ignore first frame for processing as it is the topic
+ val bytes = m.frames.tail
+ pushBlock(bytesToObjects(bytes))
+
+ case Closed ⇒ logInfo("received closed ")
+
+ }
+}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
new file mode 100644
index 0000000000..dc27178149
--- /dev/null
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+package object zeromq {
+ implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
+}
+
+
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
new file mode 100644
index 0000000000..b020ae4cef
--- /dev/null
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq;
+
+import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
+import org.junit.Test;
+
+import akka.actor.SupervisorStrategy;
+import akka.util.ByteString;
+import akka.zeromq.Subscribe;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testZeroMQStream() {
+ ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
+ String publishUrl = "abc";
+ Subscribe subscribe = new Subscribe((ByteString)null);
+ Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+ };
+
+ JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+ }
+}
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..063529a9cb
--- /dev/null
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
new file mode 100644
index 0000000000..5adcdb821f
--- /dev/null
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.zeromq
+
+import akka.actor.SupervisorStrategy
+import akka.util.ByteString
+import akka.zeromq.Subscribe
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+
+class ZeroMQStreamSuite extends TestSuiteBase {
+
+ test("zeromq input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val publishUrl = "abc"
+ val subscribe = new Subscribe(null.asInstanceOf[ByteString])
+ val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
+
+ // tests the API, does not actually test data receiving
+ val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
+ val test2 = ssc.zeroMQStream(
+ publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+
+ // TODO: Actually test data receiving
+ }
+}