diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 15:25:06 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 15:25:06 -0700 |
commit | 4358acfe07e991090fbe009aafe3f5110fbf0c40 (patch) | |
tree | 014da721b652f286e935fdd3ba791b6bea044c99 /streaming/src/main | |
parent | 16671585445f59225a0aab65ff61ed099f3f765e (diff) | |
download | spark-4358acfe07e991090fbe009aafe3f5110fbf0c40.tar.gz spark-4358acfe07e991090fbe009aafe3f5110fbf0c40.tar.bz2 spark-4358acfe07e991090fbe009aafe3f5110fbf0c40.zip |
Initialize Twitter4J OAuth from system properties instead of prompting
Diffstat (limited to 'streaming/src/main')
3 files changed, 22 insertions, 37 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index e61438fe3a..36b841af8f 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -381,7 +381,9 @@ class StreamingContext private ( /** * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret. * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index c4a223b419..ed7b789d98 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -307,7 +307,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization + * @param twitterAuth Twitter4J Authorization object * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ @@ -320,10 +320,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream that returns tweets received from Twitter using - * java.util.Preferences to store OAuth token. OAuth key and secret should - * be provided using system properties twitter4j.oauth.consumerKey and - * twitter4j.oauth.consumerSecret + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ @@ -347,10 +346,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream that returns tweets received from Twitter using - * java.util.Preferences to store OAuth token. OAuth key and secret should - * be provided using system properties twitter4j.oauth.consumerKey and - * twitter4j.oauth.consumerSecret + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream( @@ -370,10 +368,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create a input stream that returns tweets received from Twitter using - * java.util.Preferences to store OAuth token. OAuth key and secret should - * be provided using system properties twitter4j.oauth.consumerKey and - * twitter4j.oauth.consumerSecret + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. */ def twitterStream(): JavaDStream[Status] = { ssc.twitterStream() diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index e0c654d385..ff7a58be45 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -4,21 +4,21 @@ import spark._ import spark.streaming._ import storage.StorageLevel import twitter4j._ -import twitter4j.auth.BasicAuthorization import twitter4j.auth.Authorization import java.util.prefs.Preferences +import twitter4j.conf.ConfigurationBuilder import twitter4j.conf.PropertyConfiguration import twitter4j.auth.OAuthAuthorization import twitter4j.auth.AccessToken /* A stream of Twitter statuses, potentially filtered by one or more keywords. * -* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. * -* Includes a simple implementation of OAuth using consumer key and secret provided using system -* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret +* If no Authorization object is provided, initializes OAuth authorization using the system +* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. */ private[streaming] class TwitterInputDStream( @@ -28,28 +28,14 @@ class TwitterInputDStream( storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - lazy val createOAuthAuthorization: Authorization = { - val userRoot = Preferences.userRoot(); - val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null)) - val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null)) - val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties())) - if (token.isEmpty || tokenSecret.isEmpty) { - val requestToken = oAuth.getOAuthRequestToken() - println("Authorize application using URL: "+requestToken.getAuthorizationURL()) - println("Enter PIN: ") - val pin = Console.readLine - val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken() - userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken()) - userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret()) - userRoot.flush() - } else { - oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get)); - } - oAuth + private def createOAuthAuthorization(): Authorization = { + new OAuthAuthorization(new ConfigurationBuilder().build()) } + + private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel) + new TwitterReceiver(authorization, filters, storageLevel) } } |