aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/mqtt
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/mqtt')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala43
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala)54
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala24
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala4
5 files changed, 39 insertions, 92 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
deleted file mode 100644
index 86f4e9c724..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-
-/**
- * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions.
- */
-class MQTTFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- */
- def mqttStream(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
- }
-}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 72124956fc..0e6c25dbee 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -15,45 +15,61 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.mqtt
-
-import scala.reflect.ClassTag
+package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.mqtt._
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import scala.reflect.ClassTag
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating MQTT input streams.
- */
-class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
+object MQTTUtils {
+ /**
+ * Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param ssc StreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
+ * @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
+ * @param topic Topic name to subscribe to
*/
- def mqttStream(
+ def createStream(
+ jssc: JavaStreamingContext,
brokerUrl: String,
topic: String
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
+ createStream(jssc.ssc, brokerUrl, topic)
}
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
+ * @param jssc JavaStreamingContext object
+ * @param brokerUrl Url of remote MQTT publisher
+ * @param topic Topic name to subscribe to
+ * @param storageLevel RDD storage level.
*/
- def mqttStream(
+ def createStream(
+ jssc: JavaStreamingContext,
brokerUrl: String,
topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ storageLevel: StorageLevel
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
+ createStream(jssc.ssc, brokerUrl, topic, storageLevel)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 28a944f57e..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object mqtt {
- implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc)
-}
-
-
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 3ddb4d084f..44743aaecf 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -29,11 +28,10 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index ab6542918b..fcc159e85a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -28,8 +28,8 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = ssc.mqttStream(brokerUrl, topic)
- val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}