aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-14 17:49:43 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-14 17:49:43 -0800
commitdef8126d7788a8bd991ac6f9f9403de701a39dc5 (patch)
treeaedd6ef98806069248149d0fdfd08e95dbc11e85 /streaming
parent2eacf22401f75b956036fb0c32eb38baa16b224e (diff)
downloadspark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.gz
spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.bz2
spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.zip
Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala52
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala70
2 files changed, 105 insertions, 17 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8cfbec51d2..9be9d884be 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import twitter4j.Status
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -30,14 +31,14 @@ class StreamingContext private (
) extends Logging {
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a StreamingContext using an existing SparkContext.
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/**
- * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -46,7 +47,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -101,12 +102,12 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null
/**
- * Returns the associated Spark context
+ * Return the associated Spark context
*/
def sparkContext = sc
/**
- * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
@@ -117,7 +118,7 @@ class StreamingContext private (
}
/**
- * Sets the context to periodically checkpoint the DStream operations for master
+ * Set the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
@@ -200,7 +201,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
@@ -236,7 +237,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -255,7 +256,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
@@ -274,9 +275,8 @@ class StreamingContext private (
inputStream
}
-
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -286,7 +286,25 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -300,7 +318,7 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -325,7 +343,7 @@ class StreamingContext private (
}
/**
- * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
@@ -333,7 +351,7 @@ class StreamingContext private (
}
/**
- * Registers an output stream that will be computed every interval
+ * Register an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
@@ -351,7 +369,7 @@ class StreamingContext private (
}
/**
- * Starts the execution of the streams.
+ * Start the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointDuration == null && graph != null) {
@@ -379,7 +397,7 @@ class StreamingContext private (
}
/**
- * Stops the execution of the streams.
+ * Stop the execution of the streams.
*/
def stop() {
try {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
new file mode 100644
index 0000000000..d733254ddb
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -0,0 +1,70 @@
+package spark.streaming.dstream
+
+import spark._
+import spark.streaming._
+import dstream.{NetworkReceiver, NetworkInputDStream}
+import storage.StorageLevel
+import twitter4j._
+import twitter4j.auth.BasicAuthorization
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*/
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkInputDStream[Status](ssc_) {
+
+ override def createReceiver(): NetworkReceiver[Status] = {
+ new TwitterReceiver(username, password, filters, storageLevel)
+ }
+}
+
+class TwitterReceiver(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Status] {
+
+ var twitterStream: TwitterStream = _
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
+
+ protected override def onStart() {
+ blockGenerator.start()
+ twitterStream = new TwitterStreamFactory()
+ .getInstance(new BasicAuthorization(username, password))
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ blockGenerator += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) { stopOnError(e) }
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ logInfo("Twitter receiver started")
+ }
+
+ protected override def onStop() {
+ blockGenerator.stop()
+ twitterStream.shutdown()
+ logInfo("Twitter receiver stopped")
+ }
+}