aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-29 15:25:06 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-29 15:25:06 -0700
commit4358acfe07e991090fbe009aafe3f5110fbf0c40 (patch)
tree014da721b652f286e935fdd3ba791b6bea044c99 /streaming
parent16671585445f59225a0aab65ff61ed099f3f765e (diff)
downloadspark-4358acfe07e991090fbe009aafe3f5110fbf0c40.tar.gz
spark-4358acfe07e991090fbe009aafe3f5110fbf0c40.tar.bz2
spark-4358acfe07e991090fbe009aafe3f5110fbf0c40.zip
Initialize Twitter4J OAuth from system properties instead of prompting
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala32
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java2
4 files changed, 23 insertions, 38 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index e61438fe3a..36b841af8f 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -381,7 +381,9 @@ class StreamingContext private (
/**
* Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index c4a223b419..ed7b789d98 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -307,7 +307,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J Authorization
+ * @param twitterAuth Twitter4J Authorization object
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
@@ -320,10 +320,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream that returns tweets received from Twitter using
- * java.util.Preferences to store OAuth token. OAuth key and secret should
- * be provided using system properties twitter4j.oauth.consumerKey and
- * twitter4j.oauth.consumerSecret
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
@@ -347,10 +346,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream that returns tweets received from Twitter using
- * java.util.Preferences to store OAuth token. OAuth key and secret should
- * be provided using system properties twitter4j.oauth.consumerKey and
- * twitter4j.oauth.consumerSecret
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(
@@ -370,10 +368,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream that returns tweets received from Twitter using
- * java.util.Preferences to store OAuth token. OAuth key and secret should
- * be provided using system properties twitter4j.oauth.consumerKey and
- * twitter4j.oauth.consumerSecret
+ * Create a input stream that returns tweets received from Twitter using Twitter4J's default
+ * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret to be set.
*/
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index e0c654d385..ff7a58be45 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -4,21 +4,21 @@ import spark._
import spark.streaming._
import storage.StorageLevel
import twitter4j._
-import twitter4j.auth.BasicAuthorization
import twitter4j.auth.Authorization
import java.util.prefs.Preferences
+import twitter4j.conf.ConfigurationBuilder
import twitter4j.conf.PropertyConfiguration
import twitter4j.auth.OAuthAuthorization
import twitter4j.auth.AccessToken
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
-* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
*
-* Includes a simple implementation of OAuth using consumer key and secret provided using system
-* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret
+* If no Authorization object is provided, initializes OAuth authorization using the system
+* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
*/
private[streaming]
class TwitterInputDStream(
@@ -28,28 +28,14 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
- lazy val createOAuthAuthorization: Authorization = {
- val userRoot = Preferences.userRoot();
- val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null))
- val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null))
- val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties()))
- if (token.isEmpty || tokenSecret.isEmpty) {
- val requestToken = oAuth.getOAuthRequestToken()
- println("Authorize application using URL: "+requestToken.getAuthorizationURL())
- println("Enter PIN: ")
- val pin = Console.readLine
- val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken()
- userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken())
- userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret())
- userRoot.flush()
- } else {
- oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get));
- }
- oAuth
+ private def createOAuthAuthorization(): Authorization = {
+ new OAuthAuthorization(new ConfigurationBuilder().build())
}
+
+ private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
override def getReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel)
+ new TwitterReceiver(authorization, filters, storageLevel)
}
}
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index e5fdbe1b7a..4cf10582a9 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -1267,7 +1267,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTwitterStream() {
String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+ JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
}
@Test