From 742ada91e0415726b865029b1c9e9e1d6ab2ecb8 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:35:29 +0530 Subject: mqttinputdstream for mqttstreaming adapter --- .../spark/streaming/dstream/MQTTInputDStream.scala | 111 +++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala new file mode 100644 index 0000000000..3416989c02 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } + +import java.util.Properties +import java.util.concurrent.Executors +import java.io.IOException; + +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + +/** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + +private[streaming] class MQTTInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + + storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel) extends NetworkReceiver[Any] { + + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + //Set up persistence for messages + var peristance:MqttClientPersistence =new MemoryPersistence(); + + //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + + //Connect to MqttBroker + client.connect(); + + //Subscribe to Mqtt topic + client.subscribe(topic); + + //Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + //Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + System.err.println("Connection lost " + arg0) + + } + + }; + + //Set up callback for MqttClient + client.setCallback(callback); + + } + +} -- cgit v1.2.3 From 2e48b23eae6cfc5e7c825573e2739e54c4569045 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:36:25 +0530 Subject: added mqtt adapter --- .../org/apache/spark/streaming/StreamingContext.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705..8ed5dfb99b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -450,6 +450,21 @@ class StreamingContext private ( inputStream } +/** + * Create an input stream that receives messages pushed by a mqtt publisher. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + + def mqttStream( + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { + val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) + registerInputStream(inputStream) + inputStream + } /** * Create a unified DStream from multiple DStreams of the same type and same interval */ -- cgit v1.2.3 From 9a7575728d41485407821ea450d52c3a51687de5 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:41:49 +0530 Subject: add maven dependencies for mqtt --- streaming/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'streaming') diff --git a/streaming/pom.xml b/streaming/pom.xml index 7bea069b61..c793f207c6 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -109,6 +109,11 @@ slf4j-log4j12 test + + org.eclipse.paho + mqtt-client + 0.4.0 + target/scala-${scala.version}/classes -- cgit v1.2.3 From 890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Thu, 17 Oct 2013 10:00:40 +0530 Subject: modify code, use Spark Logging Class --- .../spark/streaming/dstream/MQTTInputDStream.scala | 61 +++++++++------------- 1 file changed, 26 insertions(+), 35 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 3416989c02..9b3fe67e6a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -23,16 +23,16 @@ import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContex import java.util.Properties import java.util.concurrent.Executors -import java.io.IOException; +import java.io.IOException -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic import scala.collection.Map import scala.collection.mutable.HashMap @@ -50,9 +50,7 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { - def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] @@ -62,50 +60,43 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - def onStop() { blockGenerator.stop() } - def onStart() { blockGenerator.start() - //Set up persistence for messages - var peristance:MqttClientPersistence =new MemoryPersistence(); + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + // Connect to MqttBroker + client.connect() - //Connect to MqttBroker - client.connect(); - - //Subscribe to Mqtt topic - client.subscribe(topic); - - //Callback automatically triggers as and when new message arrives on specified topic + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - //Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } - + override def deliveryComplete(arg0: IMqttDeliveryToken) { } override def connectionLost(arg0: Throwable) { - System.err.println("Connection lost " + arg0) - + logInfo("Connection lost " + arg0) } + } - }; - - //Set up callback for MqttClient - client.setCallback(callback); - + // Set up callback for MqttClient + client.setCallback(callback) } - } -- cgit v1.2.3 From d223d38933b440df372dce38c6f4181586011c9e Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 18 Oct 2013 09:09:49 +0530 Subject: Update MQTTInputDStream.scala --- .../apache/spark/streaming/dstream/MQTTInputDStream.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 9b3fe67e6a..ac0528213d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -46,24 +46,31 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ -private[streaming] class MQTTInputDStream[T: ClassManifest]( +private[streaming] +class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } -private[streaming] class MQTTReceiver(brokerUrl: String, +private[streaming] +class MQTTReceiver(brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkReceiver[Any] { + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) + def onStop() { blockGenerator.stop() } + def onStart() { blockGenerator.start() -- cgit v1.2.3