aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-29 14:36:09 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-29 14:36:09 -0700
commit16671585445f59225a0aab65ff61ed099f3f765e (patch)
treeb1b6fa3a407c80f28cf850f5eebea0c355c2a209
parent50ca17635a904f9496ccf996cd2f90325168bb9b (diff)
parent176193b1e8acdbe2f1cfaed16b8f42f89e226f79 (diff)
downloadspark-16671585445f59225a0aab65ff61ed099f3f765e.tar.gz
spark-16671585445f59225a0aab65ff61ed099f3f765e.tar.bz2
spark-16671585445f59225a0aab65ff61ed099f3f765e.zip
Merge remote-tracking branch 'mrpotes/master'
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala9
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala9
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala69
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala42
6 files changed, 97 insertions, 50 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index a9642100e3..528778ed72 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -26,8 +26,8 @@ import spark.SparkContext._
*/
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdCMS <master>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
@@ -40,12 +40,11 @@ object TwitterAlgebirdCMS {
// K highest frequency elements to take
val TOPK = 10
- val Array(master, username, password) = args.slice(0, 3)
- val filters = args.slice(3, args.length)
+ val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
index f3288bfb85..896e9fd8af 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -21,20 +21,19 @@ import spark.streaming.dstream.TwitterInputDStream
*/
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterAlgebirdHLL <master>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
- val Array(master, username, password) = args.slice(0, 3)
- val filters = args.slice(3, args.length)
+ val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 9d4494c6f2..65f0b6d352 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -12,18 +12,17 @@ import spark.SparkContext._
*/
object TwitterPopularTags {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
+ if (args.length < 1) {
+ System.err.println("Usage: TwitterPopularTags <master>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
- val Array(master, username, password) = args.slice(0, 3)
- val filters = args.slice(3, args.length)
+ val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(username, password, filters)
+ val stream = ssc.twitterStream(None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index f2c4073f22..e61438fe3a 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
+import twitter4j.auth.Authorization
/**
@@ -380,18 +381,16 @@ class StreamingContext private (
/**
* Create a input stream that returns tweets received from Twitter.
- * @param username Twitter username
- * @param password Twitter password
+ * @param twitterAuth Twitter4J authentication
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
- username: String,
- password: String,
+ twitterAuth: Option[Authorization] = None,
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
- val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 4259d4891c..c4a223b419 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -4,23 +4,18 @@ import spark.streaming._
import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
import spark.streaming.dstream._
import spark.storage.StorageLevel
-
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import spark.api.java.{JavaSparkContext, JavaRDD}
-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-
import twitter4j.Status
-
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
-
import scala.collection.JavaConversions._
-
import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap}
+import twitter4j.auth.Authorization
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -312,47 +307,79 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter.
- * @param username Twitter username
- * @param password Twitter password
+ * @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
- username: String,
- password: String,
+ twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
- ssc.twitterStream(username, password, filters, storageLevel)
+ ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using
+ * java.util.Preferences to store OAuth token. OAuth key and secret should
+ * be provided using system properties twitter4j.oauth.consumerKey and
+ * twitter4j.oauth.consumerSecret
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters, storageLevel)
}
/**
* Create a input stream that returns tweets received from Twitter.
- * @param username Twitter username
- * @param password Twitter password
+ * @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(
- username: String,
- password: String,
+ twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
- ssc.twitterStream(username, password, filters)
+ ssc.twitterStream(Some(twitterAuth), filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter using
+ * java.util.Preferences to store OAuth token. OAuth key and secret should
+ * be provided using system properties twitter4j.oauth.consumerKey and
+ * twitter4j.oauth.consumerSecret
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(None, filters)
}
/**
* Create a input stream that returns tweets received from Twitter.
- * @param username Twitter username
- * @param password Twitter password
+ * @param twitterAuth Twitter4J Authorization
*/
def twitterStream(
- username: String,
- password: String
+ twitterAuth: Authorization
): JavaDStream[Status] = {
- ssc.twitterStream(username, password)
+ ssc.twitterStream(Some(twitterAuth))
}
/**
+ * Create a input stream that returns tweets received from Twitter using
+ * java.util.Preferences to store OAuth token. OAuth key and secret should
+ * be provided using system properties twitter4j.oauth.consumerKey and
+ * twitter4j.oauth.consumerSecret
+ */
+ def twitterStream(): JavaDStream[Status] = {
+ ssc.twitterStream()
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index c697498862..e0c654d385 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -3,34 +3,59 @@ package spark.streaming.dstream
import spark._
import spark.streaming._
import storage.StorageLevel
-
import twitter4j._
import twitter4j.auth.BasicAuthorization
+import twitter4j.auth.Authorization
+import java.util.prefs.Preferences
+import twitter4j.conf.PropertyConfiguration
+import twitter4j.auth.OAuthAuthorization
+import twitter4j.auth.AccessToken
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
* @constructor create a new Twitter stream using the supplied username and password to authenticate.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
+*
+* Includes a simple implementation of OAuth using consumer key and secret provided using system
+* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret
*/
private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
- username: String,
- password: String,
+ twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
-
+
+ lazy val createOAuthAuthorization: Authorization = {
+ val userRoot = Preferences.userRoot();
+ val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null))
+ val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null))
+ val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties()))
+ if (token.isEmpty || tokenSecret.isEmpty) {
+ val requestToken = oAuth.getOAuthRequestToken()
+ println("Authorize application using URL: "+requestToken.getAuthorizationURL())
+ println("Enter PIN: ")
+ val pin = Console.readLine
+ val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken()
+ userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken())
+ userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret())
+ userRoot.flush()
+ } else {
+ oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get));
+ }
+ oAuth
+ }
+
override def getReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(username, password, filters, storageLevel)
+ new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel)
}
}
private[streaming]
class TwitterReceiver(
- username: String,
- password: String,
+ twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkReceiver[Status] {
@@ -40,8 +65,7 @@ class TwitterReceiver(
protected override def onStart() {
blockGenerator.start()
- twitterStream = new TwitterStreamFactory()
- .getInstance(new BasicAuthorization(username, password))
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
blockGenerator += status