diff options
author | prabeesh <prabsmails@gmail.com> | 2013-10-16 13:35:29 +0530 |
---|---|---|
committer | prabeesh <prabsmails@gmail.com> | 2013-10-16 13:35:29 +0530 |
commit | 742ada91e0415726b865029b1c9e9e1d6ab2ecb8 (patch) | |
tree | 046b8e83f84b1015bcb6c35c44eff39870dbce56 | |
parent | a106ed8b97e707b36818c11d1d7211fa28636178 (diff) | |
download | spark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.tar.gz spark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.tar.bz2 spark-742ada91e0415726b865029b1c9e9e1d6ab2ecb8.zip |
mqttinputdstream for mqttstreaming adapter
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala new file mode 100644 index 0000000000..3416989c02 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException; + +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] class MQTTInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + + storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel) extends NetworkReceiver[Any] { + + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + //Set up persistence for messages + var peristance:MqttClientPersistence =new MemoryPersistence(); + + //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + + //Connect to MqttBroker + client.connect(); + + //Subscribe to Mqtt topic + client.subscribe(topic); + + //Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + //Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + System.err.println("Connection lost " + arg0) + + } + + }; + + //Set up callback for MqttClient + client.setCallback(callback); + + } + +} |