diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-07 01:56:15 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-07 01:56:15 -0800 |
commit | aa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch) | |
tree | 33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/mqtt | |
parent | 3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff) | |
download | spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2 spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip |
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/mqtt')
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala | 43 | ||||
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala) | 54 | ||||
-rw-r--r-- | external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala | 24 | ||||
-rw-r--r-- | external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java | 6 | ||||
-rw-r--r-- | external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 4 |
5 files changed, 39 insertions, 92 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala deleted file mode 100644 index 86f4e9c724..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. - */ -class MQTTFunctions(ssc: StreamingContext) { - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - */ - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { - val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 72124956fc..0e6c25dbee 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -15,45 +15,61 @@ * limitations under the License. */ -package org.apache.spark.streaming.api.java.mqtt - -import scala.reflect.ClassTag +package org.apache.spark.streaming.mqtt import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.mqtt._ +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import scala.reflect.ClassTag -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating MQTT input streams. - */ -class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { +object MQTTUtils { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createStream( + ssc: StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } /** * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to + * @param topic Topic name to subscribe to */ - def mqttStream( + def createStream( + jssc: JavaStreamingContext, brokerUrl: String, topic: String ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.mqttStream(brokerUrl, topic) + createStream(jssc.ssc, brokerUrl, topic) } /** * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. */ - def mqttStream( + def createStream( + jssc: JavaStreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + storageLevel: StorageLevel ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) + createStream(jssc.ssc, brokerUrl, topic, storageLevel) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala deleted file mode 100644 index 28a944f57e..0000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object mqtt { - implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) -} - - diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index 3ddb4d084f..44743aaecf 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -29,11 +28,10 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; - MQTTFunctions mqttFunc = new MQTTFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic); - JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic, + JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); + JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index ab6542918b..fcc159e85a 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -28,8 +28,8 @@ class MQTTStreamSuite extends TestSuiteBase { val topic = "def" // tests the API, does not actually test data receiving - val test1 = ssc.mqttStream(brokerUrl, topic) - val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic) + val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data } |