aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-04-10 15:04:13 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-10 15:04:13 -0700
commit930b70f0523e96fe01c1317ef7fad1b76b36d4d9 (patch)
treefba70b8897f6c5ae1123e4717d8efdb4d4b0acc4 /external
parentf0466625200842f3cc486e9aa1caa417586be533 (diff)
downloadspark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.gz
spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.tar.bz2
spark-930b70f0523e96fe01c1317ef7fad1b76b36d4d9.zip
Remove Unnecessary Whitespace's
stack these together in a commit else they show up chunk by chunk in different commits. Author: Sandeep <sandeep@techaddict.me> Closes #380 from techaddict/white_space and squashes the following commits: b58f294 [Sandeep] Remove Unnecessary Whitespace's
Diffstat (limited to 'external')
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala16
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala6
2 files changed, 11 insertions, 11 deletions
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 41e813d48c..1204cfba39 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._
* @param storageLevel RDD storage level.
*/
-private[streaming]
+private[streaming]
class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
-
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
+
def onStop() {
blockGenerator.stop()
}
-
+
def onStart() {
blockGenerator.start()
- // Set up persistence for messages
+ // Set up persistence for messages
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
- // Connect to MqttBroker
+ // Connect to MqttBroker
client.connect()
// Subscribe to Mqtt topic
@@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String,
// Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
- // Handles Mqtt message
+ // Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 3316b6dc39..843a4a7a9a 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
-*
+*
* If no Authorization object is provided, initializes OAuth authorization using the system
* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
*/
@@ -42,13 +42,13 @@ class TwitterInputDStream(
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
-
+
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
+
override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}