diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 17:49:43 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 17:49:43 -0800 |
commit | def8126d7788a8bd991ac6f9f9403de701a39dc5 (patch) | |
tree | aedd6ef98806069248149d0fdfd08e95dbc11e85 /streaming | |
parent | 2eacf22401f75b956036fb0c32eb38baa16b224e (diff) | |
download | spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.gz spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.bz2 spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.zip |
Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 52 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala | 70 |
2 files changed, 105 insertions, 17 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8cfbec51d2..9be9d884be 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID +import twitter4j.Status /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -30,14 +31,14 @@ class StreamingContext private ( ) extends Logging { /** - * Creates a StreamingContext using an existing SparkContext. + * Create a StreamingContext using an existing SparkContext. * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) /** - * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * Create a StreamingContext by providing the details necessary for creating a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param frameworkName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches @@ -46,7 +47,7 @@ class StreamingContext private ( this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** - * Re-creates a StreamingContext from a checkpoint file. + * Re-create a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. */ @@ -101,12 +102,12 @@ class StreamingContext private ( protected[streaming] var scheduler: Scheduler = null /** - * Returns the associated Spark context + * Return the associated Spark context */ def sparkContext = sc /** - * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * Set each DStreams in this context to remember RDDs it generated in the last given duration. * DStreams remember RDDs only for a limited duration of time and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). @@ -117,7 +118,7 @@ class StreamingContext private ( } /** - * Sets the context to periodically checkpoint the DStream operations for master + * Set the context to periodically checkpoint the DStream operations for master * fault-tolerance. By default, the graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval @@ -200,7 +201,7 @@ class StreamingContext private ( } /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects @@ -236,7 +237,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -255,7 +256,7 @@ class StreamingContext private ( } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process @@ -274,9 +275,8 @@ class StreamingContext private ( inputStream } - /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value * as Text and input format as TextInputFormat). File names starting with . are ignored. * @param directory HDFS directory to monitor for new file @@ -286,7 +286,25 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -300,7 +318,7 @@ class StreamingContext private ( } /** - * Creates an input stream from a queue of RDDs. In each batch, + * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval @@ -325,7 +343,7 @@ class StreamingContext private ( } /** - * Registers an input stream that will be started (InputDStream.start() called) to get the + * Register an input stream that will be started (InputDStream.start() called) to get the * input data. */ def registerInputStream(inputStream: InputDStream[_]) { @@ -333,7 +351,7 @@ class StreamingContext private ( } /** - * Registers an output stream that will be computed every interval + * Register an output stream that will be computed every interval */ def registerOutputStream(outputStream: DStream[_]) { graph.addOutputStream(outputStream) @@ -351,7 +369,7 @@ class StreamingContext private ( } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() { if (checkpointDir != null && checkpointDuration == null && graph != null) { @@ -379,7 +397,7 @@ class StreamingContext private ( } /** - * Stops the execution of the streams. + * Stop the execution of the streams. */ def stop() { try { diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala new file mode 100644 index 0000000000..d733254ddb --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -0,0 +1,70 @@ +package spark.streaming.dstream + +import spark._ +import spark.streaming._ +import dstream.{NetworkReceiver, NetworkInputDStream} +import storage.StorageLevel +import twitter4j._ +import twitter4j.auth.BasicAuthorization + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkInputDStream[Status](ssc_) { + + override def createReceiver(): NetworkReceiver[Status] = { + new TwitterReceiver(username, password, filters, storageLevel) + } +} + +class TwitterReceiver( + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkReceiver[Status] { + + var twitterStream: TwitterStream = _ + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + blockGenerator.start() + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + blockGenerator += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { stopOnError(e) } + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + logInfo("Twitter receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + twitterStream.shutdown() + logInfo("Twitter receiver stopped") + } +} |