aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java9
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala3
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala48
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala (renamed from external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala)41
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java13
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala107
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala73
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala153
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java11
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala8
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala43
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala)54
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala24
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala49
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala (renamed from external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala)75
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala22
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala13
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala57
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala)80
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala24
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java17
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala8
35 files changed, 383 insertions, 646 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 83900a18df..0a2b3def18 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
+import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
@@ -53,8 +53,7 @@ public class JavaFlumeEventCount {
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
- FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
- JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port);
+ JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 51de4054cc..3bd7a3a90e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
+import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
/**
@@ -54,7 +54,7 @@ public class JavaKafkaWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
@@ -65,8 +65,7 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
- JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap);
+ JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
@@ -96,6 +95,6 @@ public class JavaKafkaWordCount {
});
wordCounts.print();
- ssc.start();
+ jssc.start();
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index 149640e0d1..ae3709b3d9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -52,7 +52,7 @@ object FlumeEventCount {
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+ val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 633712e816..022c8c5cb9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -53,7 +53,7 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index f65c3f8b91..325290b66f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -97,7 +97,7 @@ object MQTTWordCount {
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
- val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+ val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index a60570f884..3ccdc908e2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -35,7 +35,7 @@ import org.apache.spark.streaming.twitter._
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
@@ -63,7 +63,7 @@ object TwitterAlgebirdCMS {
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 1382fa4d1d..c7e83e76b0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -50,7 +50,7 @@ object TwitterAlgebirdHLL {
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 84842b3d65..e2b0418d55 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -40,7 +40,7 @@ object TwitterPopularTags {
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters)
+ val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 789c5f2d08..5a7673756e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -85,11 +85,10 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
//For this stream, a zeroMQ publisher should be running.
- val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
-
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
deleted file mode 100644
index 3347d19796..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.api.java.flume
-
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.flume._
-import org.apache.spark.storage.StorageLevel
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Flume input streams.
- */
-class FlumeFunctions(javaStreamingContext: JavaStreamingContext) {
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- javaStreamingContext.ssc.flumeStream(hostname, port)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
- JavaDStream[SparkFlumeEvent] = {
- javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel)
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 35e7a01abc..834b775d4f 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,20 +18,19 @@
package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-/**
- * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions.
- */
-class FlumeFunctions(ssc: StreamingContext) {
+object FlumeUtils {
/**
* Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
- def flumeStream (
+ def createStream (
+ ssc: StreamingContext,
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -40,4 +39,32 @@ class FlumeFunctions(ssc: StreamingContext) {
ssc.registerInputStream(inputStream)
inputStream
}
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ): JavaDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel)
+ }
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
deleted file mode 100644
index c087a39d1c..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object flume {
- implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc)
-}
-
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index 5930fee925..733389b98d 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -1,4 +1,4 @@
-package org.apache.spark.streaming.flume;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,21 +15,20 @@ package org.apache.spark.streaming.flume;/*
* limitations under the License.
*/
+package org.apache.spark.streaming.flume;
+
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
- FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
-
// tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345,
+ JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 74840f6499..2e8e9fac45 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -39,7 +39,7 @@ class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
deleted file mode 100644
index 491331bb37..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.api.java.kafka
-
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
-import java.lang.{Integer => JInt}
-import java.util.{Map => JMap}
-
-import kafka.serializer.Decoder
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.kafka._
-
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Kafka input streams.
- */
-class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt]
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- *
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[String, String] = {
- implicit val cmt: ClassTag[String] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param keyTypeClass Key type of RDD
- * @param valueTypeClass value type of RDD
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level. Defaults to memory-only
- */
- def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]](
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
- implicit val valueCmt: ClassTag[V] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-
- implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
- implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
-
- javaStreamingContext.ssc.kafkaStream[K, V, U, T](
- kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
deleted file mode 100644
index 2135634a69..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.reflect.ClassTag
-
-import kafka.serializer.{Decoder, StringDecoder}
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions.
- */
-class KafkaFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- */
- def kafkaStream(
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- kafkaStream[String, String, StringDecoder, StringDecoder](
- kafkaParams,
- topics,
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000000..c2d851f943
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.lang.{Integer => JInt}
+import java.util.{Map => JMap}
+
+import kafka.serializer.{Decoder, StringDecoder}
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+
+
+object KafkaUtils {
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: Map[String, Int],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[(String, String)] = {
+ val kafkaParams = Map[String, String](
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param ssc StreamingContext object
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
+ ssc: StreamingContext,
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
+ * @param groupId The group id for this consumer
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt]
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ *
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[String, String] = {
+ implicit val cmt: ClassTag[String] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param jssc JavaStreamingContext object
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
+ * @param kafkaParams Map of kafka configuration parameters,
+ * see http://kafka.apache.org/08/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread
+ * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+ */
+ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+ jssc: JavaStreamingContext,
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
+ kafkaParams: JMap[String, String],
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel
+ ): JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassTag[K] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
+ implicit val valueCmt: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ createStream[K, V, U, T](
+ jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
deleted file mode 100644
index 44e7ce6e1b..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object kafka {
- implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc)
-}
-
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index fdea96e506..7b4999447e 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,32 +18,27 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
-
-import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKafkaStream() {
-
HashMap<String, Integer> topics = Maps.newHashMap();
- KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
+ JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 2ef3e99c55..9c81f23c19 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -28,11 +28,11 @@ class KafkaStreamSuite extends TestSuiteBase {
val topics = Map("my-topic" -> 1)
// tests the API, does not actually test data receiving
- val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
- val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+ val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
- val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder](
- kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
deleted file mode 100644
index 86f4e9c724..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
- */
-class MQTTFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- */
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 72124956fc..0e6c25dbee 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -15,45 +15,61 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.mqtt
-
-import scala.reflect.ClassTag
+package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.mqtt._
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import scala.reflect.ClassTag
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating MQTT input streams.
- */
-class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+object MQTTUtils {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
+ * @param topic Topic name to subscribe to
*/
- def mqttStream(
+ def createStream(
+ jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+ createStream(jssc.ssc, brokerUrl, topic)
}
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level.
*/
- def mqttStream(
+ def createStream(
+ jssc: JavaStreamingContext,
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ storageLevel: StorageLevel
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+ createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 28a944f57e..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object mqtt {
- implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
-}
-
-
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 3ddb4d084f..44743aaecf 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -29,11 +28,10 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index ab6542918b..fcc159e85a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -28,8 +28,8 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = ssc.mqttStream(brokerUrl, topic)
- val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
deleted file mode 100644
index e91049d9b1..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions.
- */
-class TwitterFunctions(ssc: StreamingContext) {
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Option[Authorization],
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
- val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 22e297a03a..5e506ffabc 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -15,29 +15,45 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.twitter
+package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
-
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, DStream}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.twitter._
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating Twitter input streams.
- */
-class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
+object TwitterUtils {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param ssc StreamingContext object
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createStream(
+ ssc: StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
*/
- def twitterStream(): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None)
+ def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ createStream(jssc.ssc, None)
}
/**
@@ -45,10 +61,11 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def twitterStream(filters: Array[String]): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None, filters)
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters)
}
/**
@@ -56,44 +73,54 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param jssc JavaStreamingContext object
+ * @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
- def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
+ def createStream(
+ jssc: JavaStreamingContext,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ createStream(jssc.ssc, None, filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ createStream(jssc.ssc, Some(twitterAuth))
}
/**
* Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param filters Set of filter strings to get only those tweets that match them
*/
- def twitterStream(
+ def createStream(
+ jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
+ createStream(jssc.ssc, Some(twitterAuth), filters)
}
/**
* Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J Authorization object
+ * @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
- def twitterStream(
+ def createStream(
+ jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
- javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
deleted file mode 100644
index 23f82c5885..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object twitter {
- implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc)
-}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 4564d6cd33..e46b4e5c75 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -19,13 +19,10 @@ package org.apache.spark.streaming.twitter;
import java.util.Arrays;
-import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
import org.junit.Test;
-
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.NullAuthorization;
-
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -33,18 +30,17 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = twitterFunc.twitterStream();
- JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
- JavaDStream<Status> test3 =
- twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
- JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
- JavaDStream<Status> test6 =
- twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
+ JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters);
+ JavaDStream<Status> test3 = TwitterUtils.createStream(
+ ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth);
+ JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
+ JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
+ auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index d7f6d35e07..a0a8fe617b 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -29,12 +29,13 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = ssc.twitterStream(None)
- val test2 = ssc.twitterStream(None, filters)
- val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = ssc.twitterStream(Some(authorization))
- val test5 = ssc.twitterStream(Some(authorization), filters)
- val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = TwitterUtils.createStream(ssc, None)
+ val test2 = TwitterUtils.createStream(ssc, None, filters)
+ val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4 = TwitterUtils.createStream(ssc, Some(authorization))
+ val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
+ StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
deleted file mode 100644
index f4c75ab7c9..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.actor.{Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.receivers._
-
-/**
- * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
- */
-class ZeroMQFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-}
-
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index a9bbce71f5..546d9df3b5 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -15,37 +15,57 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.zeromq
+package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
-
-import akka.actor.SupervisorStrategy
+import akka.actor.{Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating ZeroMQ input streams.
- */
-class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
+object ZeroMQUtils {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
@@ -54,21 +74,23 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
+ * @param storageLevel RDD storage level.
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
@@ -76,27 +98,29 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index dc27178149..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object zeromq {
- implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
-}
-
-
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index b020ae4cef..d2361e14b8 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,13 +17,10 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
-
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
-
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+ JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ SupervisorStrategy.defaultStrategy());
}
}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 5adcdb821f..4193b8a02f 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
- val test2 = ssc.zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+ val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2 = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving