diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 18:02:49 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 18:02:49 -0800 |
commit | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (patch) | |
tree | 14b2325558807a1a0f82a79707c2d2d975f008b7 /streaming | |
parent | e240bad03b9f9e19cb84b0914b729c8d109d4815 (diff) | |
download | spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.gz spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.bz2 spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.zip |
Refactored streaming project to separate out the twitter functionality.
Diffstat (limited to 'streaming')
3 files changed, 8 insertions, 105 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..25b9b70b2c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,8 +45,8 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path -import twitter4j.Status -import twitter4j.auth.Authorization +//import twitter4j.Status +//import twitter4j.auth.Authorization import org.apache.spark.streaming.scheduler._ import akka.util.ByteString @@ -414,6 +414,7 @@ class StreamingContext private ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } + /* /** * Create a input stream that returns tweets received from Twitter. * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth @@ -431,6 +432,7 @@ class StreamingContext private ( registerInputStream(inputStream) inputStream } + */ /** * Create an input stream from a queue of RDDs. In each batch, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 78d318cf27..b32cfbb677 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -25,13 +25,13 @@ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import twitter4j.Status +//import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe import akka.util.ByteString -import twitter4j.auth.Authorization +//import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -338,7 +338,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { ssc.flumeStream(hostname, port) } - + /* /** * Create a input stream that returns tweets received from Twitter. * @param twitterAuth Twitter4J Authorization object @@ -409,7 +409,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def twitterStream(): JavaDStream[Status] = { ssc.twitterStream() } - + */ /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala deleted file mode 100644 index 387e15b0e6..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark._ -import org.apache.spark.streaming._ -import storage.StorageLevel -import twitter4j._ -import twitter4j.auth.Authorization -import java.util.prefs.Preferences -import twitter4j.conf.ConfigurationBuilder -import twitter4j.conf.PropertyConfiguration -import twitter4j.auth.OAuthAuthorization -import twitter4j.auth.AccessToken - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -* -* If no Authorization object is provided, initializes OAuth authorization using the system -* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. -*/ -private[streaming] -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { - - private def createOAuthAuthorization(): Authorization = { - new OAuthAuthorization(new ConfigurationBuilder().build()) - } - - private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - - override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(authorization, filters, storageLevel) - } -} - -private[streaming] -class TwitterReceiver( - twitterAuth: Authorization, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { - - var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - blockGenerator.start() - twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - blockGenerator += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { stopOnError(e) } - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - logInfo("Twitter receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - twitterStream.shutdown() - logInfo("Twitter receiver stopped") - } -} |