aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJames Phillpotts <jphillpotts@scottlogic.co.uk>2013-06-21 14:21:52 +0100
committerJames Phillpotts <jphillpotts@scottlogic.co.uk>2013-06-21 14:21:52 +0100
commit93a1643405d7c1a1fffe8210130341f34d64ea72 (patch)
treef70aba683c89c5caa800377a80cbffff0f0cec17 /streaming/src
parent1057fccf2ac980501501cc27faaf42770a7de9a0 (diff)
downloadspark-93a1643405d7c1a1fffe8210130341f34d64ea72.tar.gz
spark-93a1643405d7c1a1fffe8210130341f34d64ea72.tar.bz2
spark-93a1643405d7c1a1fffe8210130341f34d64ea72.zip
Allow other twitter authorizations than username/password
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala14
2 files changed, 20 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index b8b60aab43..f97e47ada0 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
+import twitter4j.auth.{Authorization, BasicAuthorization}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -372,8 +373,20 @@ class StreamingContext private (
password: String,
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = twitterStream(new BasicAuthorization(username, password), filters, storageLevel)
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Authorization,
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
- val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index c697498862..0b01091a52 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -6,6 +6,7 @@ import storage.StorageLevel
import twitter4j._
import twitter4j.auth.BasicAuthorization
+import twitter4j.auth.Authorization
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -16,21 +17,19 @@ import twitter4j.auth.BasicAuthorization
private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
- username: String,
- password: String,
+ twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
-
+
override def getReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(username, password, filters, storageLevel)
+ new TwitterReceiver(twitterAuth, filters, storageLevel)
}
}
private[streaming]
class TwitterReceiver(
- username: String,
- password: String,
+ twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkReceiver[Status] {
@@ -40,8 +39,7 @@ class TwitterReceiver(
protected override def onStart() {
blockGenerator.start()
- twitterStream = new TwitterStreamFactory()
- .getInstance(new BasicAuthorization(username, password))
+ twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
blockGenerator += status