From 93a1643405d7c1a1fffe8210130341f34d64ea72 Mon Sep 17 00:00:00 2001 From: James Phillpotts Date: Fri, 21 Jun 2013 14:21:52 +0100 Subject: Allow other twitter authorizations than username/password --- .../src/main/scala/spark/streaming/StreamingContext.scala | 15 ++++++++++++++- .../spark/streaming/dstream/TwitterInputDStream.scala | 14 ++++++-------- 2 files changed, 20 insertions(+), 9 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index b8b60aab43..f97e47ada0 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status +import twitter4j.auth.{Authorization, BasicAuthorization} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -372,8 +373,20 @@ class StreamingContext private ( password: String, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = twitterStream(new BasicAuthorization(username, password), filters, storageLevel) + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index c697498862..0b01091a52 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -6,6 +6,7 @@ import storage.StorageLevel import twitter4j._ import twitter4j.auth.BasicAuthorization +import twitter4j.auth.Authorization /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -16,21 +17,19 @@ import twitter4j.auth.BasicAuthorization private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) + new TwitterReceiver(twitterAuth, filters, storageLevel) } } private[streaming] class TwitterReceiver( - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkReceiver[Status] { @@ -40,8 +39,7 @@ class TwitterReceiver( protected override def onStart() { blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { blockGenerator += status -- cgit v1.2.3 From 8955787a596216a35ad4ec52b57331aa40444bef Mon Sep 17 00:00:00 2001 From: James Phillpotts Date: Mon, 24 Jun 2013 09:15:17 +0100 Subject: Twitter API v1 is retired - username/password auth no longer possible --- .../main/scala/spark/streaming/StreamingContext.scala | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index f97e47ada0..05be6bd58a 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status -import twitter4j.auth.{Authorization, BasicAuthorization} +import twitter4j.auth.Authorization /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -361,20 +361,6 @@ class StreamingContext private ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } - /** - * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - username: String, - password: String, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = twitterStream(new BasicAuthorization(username, password), filters, storageLevel) - /** * Create a input stream that returns tweets received from Twitter. * @param twitterAuth Twitter4J authentication -- cgit v1.2.3 From 366572edcab87701fd795ca0142ac9829b312d36 Mon Sep 17 00:00:00 2001 From: James Phillpotts Date: Tue, 25 Jun 2013 22:59:34 +0100 Subject: Include a default OAuth implementation, and update examples and JavaStreamingContext --- .../streaming/examples/TwitterAlgebirdCMS.scala | 2 +- .../streaming/examples/TwitterAlgebirdHLL.scala | 2 +- .../streaming/examples/TwitterPopularTags.scala | 2 +- .../scala/spark/streaming/StreamingContext.scala | 2 +- .../streaming/api/java/JavaStreamingContext.scala | 69 +++++++++++++++------- .../streaming/dstream/TwitterInputDStream.scala | 32 +++++++++- 6 files changed, 81 insertions(+), 28 deletions(-) (limited to 'streaming/src') diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index a9642100e3..548190309e 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -45,7 +45,7 @@ object TwitterAlgebirdCMS { val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index f3288bfb85..5a86c6318d 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -34,7 +34,7 @@ object TwitterAlgebirdHLL { val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index 9d4494c6f2..076c3878c8 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -23,7 +23,7 @@ object TwitterPopularTags { val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - val stream = ssc.twitterStream(username, password, filters) + val stream = ssc.twitterStream(None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 05be6bd58a..0f36504c0d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -368,7 +368,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - twitterAuth: Authorization, + twitterAuth: Option[Authorization] = None, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 3d149a742c..85390ef57e 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -4,23 +4,18 @@ import spark.streaming._ import receivers.{ActorReceiver, ReceiverSupervisorStrategy} import spark.streaming.dstream._ import spark.storage.StorageLevel - import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import spark.api.java.{JavaSparkContext, JavaRDD} - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - import twitter4j.Status - import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe - import scala.collection.JavaConversions._ - import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} +import twitter4j.auth.Authorization /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -315,46 +310,78 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters, storageLevel) + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters) + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization */ def twitterStream( - username: String, - password: String + twitterAuth: Authorization ): JavaDStream[Status] = { - ssc.twitterStream(username, password) + ssc.twitterStream(Some(twitterAuth)) } + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream() + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index 0b01091a52..e0c654d385 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -3,27 +3,53 @@ package spark.streaming.dstream import spark._ import spark.streaming._ import storage.StorageLevel - import twitter4j._ import twitter4j.auth.BasicAuthorization import twitter4j.auth.Authorization +import java.util.prefs.Preferences +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken /* A stream of Twitter statuses, potentially filtered by one or more keywords. * * @constructor create a new Twitter stream using the supplied username and password to authenticate. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. +* +* Includes a simple implementation of OAuth using consumer key and secret provided using system +* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret */ private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - twitterAuth: Authorization, + twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { + lazy val createOAuthAuthorization: Authorization = { + val userRoot = Preferences.userRoot(); + val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null)) + val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null)) + val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties())) + if (token.isEmpty || tokenSecret.isEmpty) { + val requestToken = oAuth.getOAuthRequestToken() + println("Authorize application using URL: "+requestToken.getAuthorizationURL()) + println("Enter PIN: ") + val pin = Console.readLine + val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken() + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken()) + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret()) + userRoot.flush() + } else { + oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get)); + } + oAuth + } + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(twitterAuth, filters, storageLevel) + new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel) } } -- cgit v1.2.3