diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 14:36:09 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 14:36:09 -0700 |
commit | 16671585445f59225a0aab65ff61ed099f3f765e (patch) | |
tree | b1b6fa3a407c80f28cf850f5eebea0c355c2a209 /streaming | |
parent | 50ca17635a904f9496ccf996cd2f90325168bb9b (diff) | |
parent | 176193b1e8acdbe2f1cfaed16b8f42f89e226f79 (diff) | |
download | spark-16671585445f59225a0aab65ff61ed099f3f765e.tar.gz spark-16671585445f59225a0aab65ff61ed099f3f765e.tar.bz2 spark-16671585445f59225a0aab65ff61ed099f3f765e.zip |
Merge remote-tracking branch 'mrpotes/master'
Diffstat (limited to 'streaming')
3 files changed, 85 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index f2c4073f22..e61438fe3a 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status +import twitter4j.auth.Authorization /** @@ -380,18 +381,16 @@ class StreamingContext private ( /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J authentication * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, + twitterAuth: Option[Authorization] = None, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 4259d4891c..c4a223b419 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -4,23 +4,18 @@ import spark.streaming._ import receivers.{ActorReceiver, ReceiverSupervisorStrategy} import spark.streaming.dstream._ import spark.storage.StorageLevel - import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import spark.api.java.{JavaSparkContext, JavaRDD} - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - import twitter4j.Status - import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe - import scala.collection.JavaConversions._ - import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} +import twitter4j.auth.Authorization /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -312,47 +307,79 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters, storageLevel) + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters) + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization */ def twitterStream( - username: String, - password: String + twitterAuth: Authorization ): JavaDStream[Status] = { - ssc.twitterStream(username, password) + ssc.twitterStream(Some(twitterAuth)) } /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream() + } + + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index c697498862..e0c654d385 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -3,34 +3,59 @@ package spark.streaming.dstream import spark._ import spark.streaming._ import storage.StorageLevel - import twitter4j._ import twitter4j.auth.BasicAuthorization +import twitter4j.auth.Authorization +import java.util.prefs.Preferences +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken /* A stream of Twitter statuses, potentially filtered by one or more keywords. * * @constructor create a new Twitter stream using the supplied username and password to authenticate. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. +* +* Includes a simple implementation of OAuth using consumer key and secret provided using system +* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret */ private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - username: String, - password: String, + twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - + + lazy val createOAuthAuthorization: Authorization = { + val userRoot = Preferences.userRoot(); + val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null)) + val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null)) + val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties())) + if (token.isEmpty || tokenSecret.isEmpty) { + val requestToken = oAuth.getOAuthRequestToken() + println("Authorize application using URL: "+requestToken.getAuthorizationURL()) + println("Enter PIN: ") + val pin = Console.readLine + val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken() + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken()) + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret()) + userRoot.flush() + } else { + oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get)); + } + oAuth + } + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) + new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel) } } private[streaming] class TwitterReceiver( - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkReceiver[Status] { @@ -40,8 +65,7 @@ class TwitterReceiver( protected override def onStart() { blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { blockGenerator += status |