aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
commit6e43039614ed1ec55a134fb82fb3e8d4e80996ef (patch)
tree14b2325558807a1a0f82a79707c2d2d975f008b7 /external
parente240bad03b9f9e19cb84b0914b729c8d109d4815 (diff)
downloadspark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.gz
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.bz2
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.zip
Refactored streaming project to separate out the twitter functionality.
Diffstat (limited to 'external')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala27
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala101
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala7
3 files changed, 135 insertions, 0 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
new file mode 100644
index 0000000000..fe66e28ce6
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
@@ -0,0 +1,27 @@
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+
+class StreamingContextWithTwitter(ssc: StreamingContext) {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Option[Authorization] = None,
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
new file mode 100644
index 0000000000..97e48ebeca
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.twitter
+
+import java.util.prefs.Preferences
+import twitter4j._
+import twitter4j.auth.Authorization
+import twitter4j.conf.ConfigurationBuilder
+import twitter4j.conf.PropertyConfiguration
+import twitter4j.auth.OAuthAuthorization
+import twitter4j.auth.AccessToken
+import org.apache.spark._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.storage.StorageLevel
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*
+* If no Authorization object is provided, initializes OAuth authorization using the system
+* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
+*/
+private[streaming]
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ twitterAuth: Option[Authorization],
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ private def createOAuthAuthorization(): Authorization = {
+ new OAuthAuthorization(new ConfigurationBuilder().build())
+ }
+
+ private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
+
+ override def getReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(authorization, filters, storageLevel)
+ }
+}
+
+private[streaming]
+class TwitterReceiver(
+ twitterAuth: Authorization,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}
+
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
new file mode 100644
index 0000000000..89c202a730
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
@@ -0,0 +1,7 @@
+package org.apache.spark.streaming
+
+package object twitter {
+ implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = {
+ new StreamingContextWithTwitter(ssc)
+ }
+}