aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala99
1 files changed, 99 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
new file mode 100644
index 0000000000..387e15b0e6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.dstream
+
+import org.apache.spark._
+import org.apache.spark.streaming._
+import storage.StorageLevel
+import twitter4j._
+import twitter4j.auth.Authorization
+import java.util.prefs.Preferences
+import twitter4j.conf.ConfigurationBuilder
+import twitter4j.conf.PropertyConfiguration
+import twitter4j.auth.OAuthAuthorization
+import twitter4j.auth.AccessToken
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*
+* If no Authorization object is provided, initializes OAuth authorization using the system
+* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
+*/
+private[streaming]
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ private def createOAuthAuthorization(): Authorization = {
+ new OAuthAuthorization(new ConfigurationBuilder().build())
+ }
+
+ private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
+
+ override def getReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(authorization, filters, storageLevel)
+ }
+}
+
+private[streaming]
+class TwitterReceiver(
+ twitterAuth: Authorization,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}