aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorprabeesh <prabsmails@gmail.com>2013-10-16 13:35:29 +0530
committerprabeesh <prabsmails@gmail.com>2013-10-16 13:35:29 +0530
commit742ada91e0415726b865029b1c9e9e1d6ab2ecb8 (patch)
tree046b8e83f84b1015bcb6c35c44eff39870dbce56 /streaming
parenta106ed8b97e707b36818c11d1d7211fa28636178 (diff)
downloadspark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.tar.gz
spark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.tar.bz2
spark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.zip
mqttinputdstream for mqttstreaming adapter
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala111
1 files changed, 111 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
new file mode 100644
index 0000000000..3416989c02
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext }
+
+import java.util.Properties
+import java.util.concurrent.Executors
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttTopic;
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions._
+
+/**
+ * Input stream that subscribe messages from a Mqtt Broker.
+ * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level.
+ */
+
+private[streaming] class MQTTInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ brokerUrl: String,
+ topic: String,
+
+ storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ new MQTTReceiver(brokerUrl, topic, storageLevel)
+ .asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+private[streaming] class MQTTReceiver(brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel) extends NetworkReceiver[Any] {
+
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+
+ def onStop() {
+ blockGenerator.stop()
+ }
+
+ def onStart() {
+
+ blockGenerator.start()
+
+ //Set up persistence for messages
+ var peristance:MqttClientPersistence =new MemoryPersistence();
+
+ //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
+ var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance);
+
+ //Connect to MqttBroker
+ client.connect();
+
+ //Subscribe to Mqtt topic
+ client.subscribe(topic);
+
+ //Callback automatically triggers as and when new message arrives on specified topic
+ var callback: MqttCallback = new MqttCallback() {
+
+ //Handles Mqtt message
+ override def messageArrived(arg0: String, arg1: MqttMessage) {
+ blockGenerator += new String(arg1.getPayload())
+ }
+
+ override def deliveryComplete(arg0: IMqttDeliveryToken) {
+ }
+
+ override def connectionLost(arg0: Throwable) {
+ System.err.println("Connection lost " + arg0)
+
+ }
+
+ };
+
+ //Set up callback for MqttClient
+ client.setCallback(callback);
+
+ }
+
+}