diff options
author | James Phillpotts <james.github@potes.org.uk> | 2013-06-25 22:59:34 +0100 |
---|---|---|
committer | James Phillpotts <james.github@potes.org.uk> | 2013-06-25 22:59:34 +0100 |
commit | 366572edcab87701fd795ca0142ac9829b312d36 (patch) | |
tree | f902e318243a8536ec3fb63ccb3efb07c4910674 /streaming/src/main | |
parent | 8955787a596216a35ad4ec52b57331aa40444bef (diff) | |
download | spark-366572edcab87701fd795ca0142ac9829b312d36.tar.gz spark-366572edcab87701fd795ca0142ac9829b312d36.tar.bz2 spark-366572edcab87701fd795ca0142ac9829b312d36.zip |
Include a default OAuth implementation, and update examples and JavaStreamingContext
Diffstat (limited to 'streaming/src/main')
3 files changed, 78 insertions, 25 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 05be6bd58a..0f36504c0d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -368,7 +368,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - twitterAuth: Authorization, + twitterAuth: Option[Authorization] = None, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 3d149a742c..85390ef57e 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -4,23 +4,18 @@ import spark.streaming._ import receivers.{ActorReceiver, ReceiverSupervisorStrategy} import spark.streaming.dstream._ import spark.storage.StorageLevel - import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import spark.api.java.{JavaSparkContext, JavaRDD} - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - import twitter4j.Status - import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe - import scala.collection.JavaConversions._ - import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} +import twitter4j.auth.Authorization /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -315,47 +310,79 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters, storageLevel) + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters) + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization */ def twitterStream( - username: String, - password: String + twitterAuth: Authorization ): JavaDStream[Status] = { - ssc.twitterStream(username, password) + ssc.twitterStream(Some(twitterAuth)) } /** + * Create a input stream that returns tweets received from Twitter using + * java.util.Preferences to store OAuth token. OAuth key and secret should + * be provided using system properties twitter4j.oauth.consumerKey and + * twitter4j.oauth.consumerSecret + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream() + } + + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index 0b01091a52..e0c654d385 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -3,27 +3,53 @@ package spark.streaming.dstream import spark._ import spark.streaming._ import storage.StorageLevel - import twitter4j._ import twitter4j.auth.BasicAuthorization import twitter4j.auth.Authorization +import java.util.prefs.Preferences +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken /* A stream of Twitter statuses, potentially filtered by one or more keywords. * * @constructor create a new Twitter stream using the supplied username and password to authenticate. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. +* +* Includes a simple implementation of OAuth using consumer key and secret provided using system +* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret */ private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - twitterAuth: Authorization, + twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { + lazy val createOAuthAuthorization: Authorization = { + val userRoot = Preferences.userRoot(); + val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null)) + val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null)) + val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties())) + if (token.isEmpty || tokenSecret.isEmpty) { + val requestToken = oAuth.getOAuthRequestToken() + println("Authorize application using URL: "+requestToken.getAuthorizationURL()) + println("Enter PIN: ") + val pin = Console.readLine + val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken() + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken()) + userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret()) + userRoot.flush() + } else { + oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get)); + } + oAuth + } + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(twitterAuth, filters, storageLevel) + new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel) } } |