diff options
author | James Phillpotts <jphillpotts@scottlogic.co.uk> | 2013-06-21 14:21:52 +0100 |
---|---|---|
committer | James Phillpotts <jphillpotts@scottlogic.co.uk> | 2013-06-21 14:21:52 +0100 |
commit | 93a1643405d7c1a1fffe8210130341f34d64ea72 (patch) | |
tree | f70aba683c89c5caa800377a80cbffff0f0cec17 /streaming | |
parent | 1057fccf2ac980501501cc27faaf42770a7de9a0 (diff) | |
download | spark-93a1643405d7c1a1fffe8210130341f34d64ea72.tar.gz spark-93a1643405d7c1a1fffe8210130341f34d64ea72.tar.bz2 spark-93a1643405d7c1a1fffe8210130341f34d64ea72.zip |
Allow other twitter authorizations than username/password
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 15 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala | 14 |
2 files changed, 20 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index b8b60aab43..f97e47ada0 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status +import twitter4j.auth.{Authorization, BasicAuthorization} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -372,8 +373,20 @@ class StreamingContext private ( password: String, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = twitterStream(new BasicAuthorization(username, password), filters, storageLevel) + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index c697498862..0b01091a52 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -6,6 +6,7 @@ import storage.StorageLevel import twitter4j._ import twitter4j.auth.BasicAuthorization +import twitter4j.auth.Authorization /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -16,21 +17,19 @@ import twitter4j.auth.BasicAuthorization private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { - + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) + new TwitterReceiver(twitterAuth, filters, storageLevel) } } private[streaming] class TwitterReceiver( - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkReceiver[Status] { @@ -40,8 +39,7 @@ class TwitterReceiver( protected override def onStart() { blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { blockGenerator += status |